根据交换机类型不同,分为3种:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring:
application:
name: spring-boot-amqp
rabbitmq:
host: xxx.xxx.xxx.xxx
port: 5672
username: rabbit
password: 123456
package com.hxtec.polaris.configure;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* API:BindingBuilder.bind(指定队列).to(交换机).with(路由键);
*/
@Configuration
public class MyAMQPConfig {
final static String tpm = "topic.message";
final static String tp1m = "topic1.message";
final static String tpm1 = "topic.message1";
final static String tpm2 = "topic.message2";
@Bean
public Queue tpm() {
return new Queue(MyAMQPConfig.tpm);
}
@Bean
public Queue tp1m() {
return new Queue(MyAMQPConfig.tp1m);
}
@Bean
public Queue tpm1() {
return new Queue(MyAMQPConfig.tpm1);
}
@Bean
public Queue tpm2() {
return new Queue(MyAMQPConfig.tpm2);
}
@Bean
TopicExchange topicExchange() {
return new TopicExchange("topic.exchange");
}
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange("fanout.exchange");
}
@Bean
DirectExchange directExchange() {
return new DirectExchange("direct.exchange");
}
/**
* topic单播,只给topic.message发送
* @param tpm
* @param topicExchange
* @return
*/
@Bean
Binding bindingTopicExchangeMessage(Queue tpm, TopicExchange topicExchange) {
return BindingBuilder.bind(tpm).to(topicExchange).with("topic.message");
}
/**
* Fanout广播,给绑定fanoutExchange的queues全部发送
* @param tpm
* @param fanoutExchange
* @return
*/
@Bean
Binding bindingFanoutExchangeMessage(Queue tpm, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(tpm).to(fanoutExchange);
}
/**
* 绑定规则
* @param tpm
* @param directExchange
* @return
*/
@Bean
Binding bindingDirectExchangeMessage(Queue tpm,DirectExchange directExchange) {
return BindingBuilder.bind(tpm).to(directExchange).with("topic.message#");
}
}
package com.hxtec.polaris.ampq;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Date;
/**
* API:amqpTemplate.convertAndSend("交换机名",“路由键”,“消息内容”)
*/
@Component
public class RabbitHelloSender {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send() {
String context = "hello " + new Date();
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("topic.exchange","tpm", context);
}
}
package com.hxtec.polaris.ampq;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @RabbitListener(queues = "direct"):监听器监听指定队列
*/
@Component
@RabbitListener(queues = "topic.message")
public class RabbitHelloReceiver {
@RabbitHandler
public void process(String hello) {
System.out.println("Receiver : " + hello);
}
}
原文:https://www.cnblogs.com/faramita/p/12779502.html