当消息生产的速度长时间,远远大于消费的速度时。就会造成消息堆积。
在用户登录成功之后,会向rabbitmq发送一个登录成功的消息。这个消息可以被多类业务订阅。
登录成功,记录登录日志;登录成功,根据规则送积分。其中登录送积分可以模拟成较为耗时的处理
场景重现:让消息产生堆积
生产者大量发送消息:使用Jmeter开启多线程,循环发送消息大量进入队列。
模拟堆积10万条数据
消费者消费失败:随机抛出异常,模拟消费者消费失败,没有ack(手动ack的时候)。
设置消费者的性能瓶颈:在消费方法中设置休眠时间,模拟性能瓶颈
关闭消费者:停掉消费者,模拟消费者挂掉
消费者端示例核心代码:
public class LoginIntegralComsumer implements MessageListener {
public void onMessage(Message message) {
String jsonString = null;
try {
jsonString = new String(message.getBody(),"UTF8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
if(new Random().nextInt(5)==2){
//模拟发生异常
throw new RuntimeException("模拟处理异常");
}
try {
//模拟耗时的处理过程
TimeUnit.MILLISECONDS.sleep(1000);
System.out.println(Thread.currentThread().getName()+"处理消息:"+jsonString);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
1小时处理 60*60=3600条
处理完10万条数据 100000/3600=27.7小时
问题解决:消息已经堆积如何解决
消息队列堆积,想办法把消息转移到一个新的队列,增加服务器慢慢来消费这个消息可以
生产环境的队列可用状态
1、解决消费者消费异常问题
2、解决消费者的性能瓶颈:改短休眠时间
5.4小时。
3、增加消费线程,增加多台服务器部署消费者。快速消费。
增加10个线程
concurrency="10" prefetch="10"
1小时
增加一台服务器
0.5小时
在实际的生产环境中有可能出现一条消息因为一些原因丢失,导致消息没有消费成功,从而造成数据不一致等问题,造成严重的影响,比如:在一个商城的下单业务中,需要生成订单信息和扣减库存两个动作,如果使用RabbitMQ来实现该业务,那么在订单服务下单成功后需要发送一条消息到库存服务进行扣减库存,如果在此过程中,一条消息因为某些原因丢失,那么就会出现下单成功但是库存没有扣减,从而导致超卖的情况,也就是库存已经没有了,但是用户还能下单,这个问题对于商城系统来说是致命的。
消息丢失的场景主要分为:消息在生产者丢失,消息在RabbitMQ丢失,消息在消费者丢失。
消息生产者发送消息成功,但是MQ没有收到该消息,消息在从生产者传输到MQ的过程中丢失,一般是由于网络不稳定的原因。
采用RabbitMQ 发送方消息确认机制,当消息成功被MQ接收到时,会给生产者发送一个确认消息,表示接收成功。RabbitMQ 发送方消息确认模式有以下三种:普通确认模式,批量确认模式,异步监听确认模式。spring整合RabbitMQ后只使用了异步监听确认模式。
说明
异步监听模式,可以实现边发送消息边进行确认,不影响主线程任务执行。
步骤
生产者发送3000条消息
在发送消息前开启开启发送方确认模式
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"
publisher-confirms="true"
/>
在发送消息前添加异步确认监听器
//添加异步确认监听器
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
// 处理ack
System.out.println("已确认消息,标识:" + correlationData.getId());
} else {
// 处理nack, 此时cause包含nack的原因。
System.out.println("未确认消息,标识:" + correlationData.getId());
System.out.println("未确认原因:" + cause);
//重发
}
}
});
消息成功发送到MQ,消息还没被消费却在MQ中丢失,比如MQ服务器宕机或者重启会出现这种情况
持久化交换机,队列,消息,确保MQ服务器重启时依然能从磁盘恢复对应的交换机,队列和消息。
spring整合后默认开启了交换机,队列,消息的持久化,所以不修改任何设置就可以保证消息不在RabbitMQ丢失。但是为了以防万一,还是可以申明下。
消息费者消费消息时,如果设置为自动回复MQ,消息者端收到消息后会自动回复MQ服务器,MQ则会删除该条消息,如果消息已经在MQ被删除但是消费者的业务处理出现异常或者消费者服务宕机,那么就会导致该消息没有处理成功从而导致该条消息丢失。
设置为手动回复MQ服务器,当消费者出现异常或者服务宕机时,MQ服务器不会删除该消息,而是会把消息重发给绑定该队列的消费者,如果该队列只绑定了一个消费者,那么该消息会一直保存在MQ服务器,直到消息者能正常消费为止。本解决方案以一个队列绑定多个消费者为例来说明,一般在生产环境上也会让一个队列绑定多个消费者也就是工作队列模式来减轻压力,提高消息处理效率
MQ重发消息场景:
1.消费者未响应ACK,主动关闭频道或者连接
2.消费者未响应ACK,消费者服务挂掉
场景1
当RabbitMQ采用work Queue模式,此时只会有一个Queue但是会有多个Consumer,同时多个Consumer直接是竞争关系,此时就会出现MQ消息乱序的问题。
场景2
当RabbitMQ采用简单队列模式的时候,如果消费者采用多线程的方式来加速消息的处理,此时也会出现消息乱序的问题。
为了防止消息在消费者端丢失,会采用手动回复MQ的方式来解决,同时也引出了一个问题,消费者处理消息成功,手动回复MQ时由于网络不稳定,连接断开,导致MQ没有收到消费者回复的消息,那么该条消息还会保存在MQ的消息队列,由于MQ的消息重发机制,会重新把该条消息发给和该队列绑定的消息者处理,这样就会导致消息重复消费。而有些操作是不允许重复消费的,比如下单,减库存,扣款等操作。
MQ重发消息场景:
1.消费者未响应ACK,主动关闭频道或者连接
2.消费者未响应ACK,消费者服务挂掉
如果消费消息的业务是幂等性操作(同一个操作执行多次,结果不变)就算重复消费也没问题,可以不做处理,如果不支持幂等性操作,如:下单,减库存,扣款等,那么可以在消费者端每次消费成功后将该条消息id保存到数据库,每次消费前查询该消息id,如果该条消息id已经存在那么表示已经消费过就不再消费否则就消费。本方案采用redis存储消息id,因为redis是单线程的,并且性能也非常好,提供了很多原子性的命令,本方案使用setnx命令存储消息id。
setnx(key,value):如果key不存在则插入成功且返回1,如果key存在,则不进行任何操作,返回0
原文:https://www.cnblogs.com/dalianpai/p/13607665.html