首页 > 其他 > 详细

二、RabbitMQ之延时消息(2)

时间:2020-09-01 10:56:09      阅读:50      评论:0      收藏:0      [点我收藏+]

rabbitmq安装延时插件 rabbitmq_delayed_message_exchange

1.到官网https://www.rabbitmq.com/community-plugins.html,下载对应版本的rabbitmq_delayed_message_exchange

技术分享图片

 

 

 

2.将插件拷贝到rabbitmq的plugins目录下,我本地使用的docker启动的rabbitmq服务,使用命令 docker cp rabbitmq_delayed_message_exchange-3.8.0.ez 容器ID:/plugins

3.进入容器内部,docker exec -it 5af /bin/bash, 进入plugins目录,查看是否拷贝成功 cd /plugins

技术分享图片

 

 

 

4.启用延时插件,执行命令 rabbitmq-plugins enable rabbitmq_delayed_message_exchange

技术分享图片

 

 

 5.打开web页面的Exchange模块,可以看见多了一种类型(PS:如果没有看见,可以重启一下服务)

技术分享图片

 

 

 6.测试

import org.springframework.amqp.core.*;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;
 
@Configuration
public class MQExchangeConfig {

    public static final String CUSTOM_EXCHANGE_NAME = "custom-Exchange";
   
    public static final String QUEUEA_NAME = "queueA";
    public static final String QUEUEB_NAME = "queueB";
    public static final String ROUTING_KEY_A_NAME = "routingKeyA";
    public static final String ROUTING_KEY_B_NAME = "routingKeyB";


    @Bean
    CustomExchange customExchange() {
        Map<String, Object> args = new HashMap<String, Object>();
//路由策略,必填项,参考ExchangeTypes args.put("x-delayed-type", "direct"); return new CustomExchange(CUSTOM_EXCHANGE_NAME, "x-delayed-message", true, false, args); } @Bean Queue queueA() { return new Queue(QUEUEA_NAME); } @Bean Queue queueB() { return new Queue(QUEUEB_NAME); } @Bean Binding bindingAC(Queue queueA, CustomExchange customExchange) { return BindingBuilder.bind(queueA).to(customExchange).with(ROUTING_KEY_A_NAME).noargs(); } @Bean Binding bindingBC(Queue queueB, CustomExchange customExchange) { return BindingBuilder.bind(queueB).to(customExchange).with(ROUTING_KEY_B_NAME).noargs(); } //先初始化队列 @Bean @ConditionalOnBean(Queue.class) MQExchangeConsumer mqExchangeConsumer() { return new MQExchangeConsumer(); } }

 //发送消息,注意:延时时间设置:Delay > 0, Delay =< ?ERL_MAX_T (In Erlang a timer can be set up to (2^32)-1 milliseconds in the future).

 rabbitTemplate.convertSendAndReceive(CUSTOM_EXCHANGE_NAME, ROUTING_KEY_A_NAME, msg + "a", message -> {
// 本质还是设置header的x-delay=10000,可以参考日志信息 message.getMessageProperties().setDelay(10000); return message; }); rabbitTemplate.convertSendAndReceive(CUSTOM_EXCHANGE_NAME, ROUTING_KEY_B_NAME, msg + "b", message -> { message.getMessageProperties().setHeader("x-delay", 20000); return message; });

 //结果

技术分享图片

 

 

 7.停用延时插件

执行命令:rabbitmq-plugins disable rabbitmq_delayed_message_exchange。

注:停用后延时未分发的消息将会丢失

8.其他

消息分发前是存储在节点下的Mnesia table中,通过计时器调度实现分发,官网写到:这个插件的设计并不适合大量延迟消息的情况(例如100s数千条或数百万条)。因为随着mnesia数据库的增长,延迟消息的延时时间变得难以控制,就很难达到预期的效果

 

二、RabbitMQ之延时消息(2)

原文:https://www.cnblogs.com/Hleaves/p/13594278.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!