<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency>
#tomcat端口
server:
port: 8888
#RabbitMq的配置
spring:
rabbitmq:
host: 192.168.47.132
port: 5672
username: yellowstreak
password: 123456
virtual-host: /yellowstreak
@Configuration public class RabbitMQConfig { //交换机名称 public static final String ITEM_TOPIC_EXCHANGE = "item_topic_exchange"; //队列名称 public static final String ITEM_QUEUE = "item_queue"; //声明交换机 @Bean("itemTopicExchange") public Exchange topicExchange() { //durable(true): 持久化交换机 return ExchangeBuilder.topicExchange(ITEM_TOPIC_EXCHANGE).durable(true).build(); } //声明队列 @Bean("itemQueue") public Queue itemQueue() { return QueueBuilder.durable(ITEM_QUEUE).build(); } //绑定队列与交换机 @Bean public Binding itemQueueExchange(@Qualifier("itemQueue") Queue queue, @Qualifier("itemTopicExchange") Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("item.#").noargs(); } }
@RestController public class SendMessageController { //注入Rabbit模板 @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/send/message") public String sendMessage(@RequestParam String message, @RequestParam String key) { /** * 发送消息 * 参数1: 交换机名称 * 参数2: Routing Key * 参数3: 发送的消息. */ rabbitTemplate.convertAndSend(RabbitMQConfig.ITEM_TOPIC_EXCHANGE, key, message); return "发送消息成功"; } }
#tomcat端口 server: port: 9999 #RabbitMq的配置 spring: rabbitmq: host: 192.168.47.132 port: 5672 username: yellowstreak password: 123456 virtual-host: /yellowstreak
package com.example.consumer; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * 消费者的监听类 */ @Component public class MyListener { @RabbitListener(queues = "item_queue") public void message(String message) { System.out.println("消费者消费消息了: " + message ); } }
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"> </beans>
@ImportResource(locations = "classpath:/spring/spring-rabbitmq.xml")
<!--定义过期队列及其属性,不存在则自动创建--> <rabbit:queue id="my_ttl_queue" name="my_ttl_queue" auto-declare="true"> <rabbit:queue-arguments> <!--投递到该队列的消息如果没有消费都将在6秒之后被删除--> <entry key="x-message-ttl" value-type="long" value="6000"/> </rabbit:queue-arguments> </rabbit:queue>
@RunWith(SpringRunner.class) @SpringBootTest class SpringbootRabbitmqProducerApplicationTests { @Autowired private RabbitTemplate rabbitTemplate; /** * 过期队列消息 * 投递到该队列的消息如果没有消费都将在6秒之后被删除 */ @Test public void ttlQueueTest(){ //发送消息 rabbitTemplate.convertAndSend("my_ttl_queu", "发送到过期队列my_ttl_queue,6秒内不消费则不能再被消费。"); } }
@RunWith(SpringRunner.class) @SpringBootTest class SpringbootRabbitmqProducerApplicationTests { @Autowired private RabbitTemplate rabbitTemplate; @Test public void ttlMessageTest() { MessageProperties messageProperties = new MessageProperties(); messageProperties.setExpiration("5000"); Message message = new Message("消息5s过期".getBytes(), messageProperties); //发送消息 rabbitTemplate.convertAndSend(RabbitMQConfig.ITEM_TOPIC_EXCHANGE, "item.update", message); } }
<!-- 定义定向交换机中的持久化死信队列,不存在则自动创建 --> <rabbit:queue id="my_dlx_queue" name="my_dlx_queue" auto-declare="true" /> <!--定义广播类型交换机;并绑定上述队列--> <rabbit:direct-exchange id="my_dlx_exchange" name="my_dlx_exchange" auto-declare="true"> <rabbit:bindings> <!--绑定路由键my_ttl_dlx、my_max_dlx,可以将过期的消息转移到my_dlx_queue队列--> <rabbit:binding key="my_ttl_dlx" queue="my_dlx_queue" /> <rabbit:binding key="my_max_dlx" queue="my_dlx_queue"/> </rabbit:bindings> </rabbit:direct-exchange>
<!--定义过期队列及其属性,不存在则自动创建--> <rabbit:queue id="my_ttl_dlx_queue" name="my_ttl_dlx_queue" auto-declare="true"> <rabbit:queue-arguments> <!--投递到该队列的消息如果没有消费都将在6秒之后被投递到死信交换机--> <entry key="x-message-ttl" value-type="long" value="6000"/> <!--设置当消息过期后投递到对应的死信交换机--> <entry key="x-dead-letter-exchange" value="my_dlx_exchange"/> </rabbit:queue-arguments> </rabbit:queue> <!--定义限制长度的队列及其属性,不存在则自动创建--> <rabbit:queue id="my_max_dlx_queue" name="my_max_dlx_queue" auto-declare="true"> <rabbit:queue-arguments> <!--投递到该队列的消息最多2个消息,如果超过则最早的消息被删除投递到死信交换机--> <entry key="x-max-length" value-type="long" value="2"/> <!--设置当消息过期后投递到对应的死信交换机--> <entry key="x-dead-letter-exchange" value="my_dlx_exchange"/> </rabbit:queue-arguments> </rabbit:queue> <!--定义定向交换机 根据不同的路由key投递消息--> <rabbit:direct-exchange id="my_normal_exchange" name="my_normal_exchange" auto-declare="true"> <rabbit:bindings> <rabbit:binding key="my_ttl_dlx" queue="my_ttl_dlx_queue"/> <rabbit:binding key="my_max_dlx" queue="my_max_dlx_queue"/> </rabbit:bindings> </rabbit:direct-exchange>
@Test public void dlxTTLMessageTest() { rabbitTemplate.convertAndSend("my_normal_exchange", "my_ttl_dlx", "六秒过期后投递给死信交换机"); }
/** * 消费者的监听类 */ @Component public class MyListener { @RabbitListener(queues = "my_dlx_queue") public void message(String message) { System.out.println("消费者消费消息了: " + message ); } }
/** * 超过队列长度消息投递到死信队列 * 投递到一个正常的队列,但是该队列有设置最大消息数,到最大消息数之后队列中最早的消息会被投递到死信交换机(队列) */ @Test public void dlxMaxMessageTest() { rabbitTemplate.convertAndSend("my_normal_exchange", "my_max_dlx", "这是第1个消息"); rabbitTemplate.convertAndSend("my_normal_exchange", "my_max_dlx", "这是第2个消息"); rabbitTemplate.convertAndSend("my_normal_exchange", "my_max_dlx", "这是第3个消息"); }
原文:https://www.cnblogs.com/binwenhome/p/12971764.html