应用场景: 1.异步处理 : 同步阻塞的(会造成等待), 异步是非阻塞的(不会等待), 批量数据,就可以采用异步处理.
2.系统解耦 : 多个系统之间, 不需要直接交互, 通过消息进行业务流转.
3.流量削峰 : 高负载请求/任务缓冲处理.
消息队列中增加了交换器(Exchange):
1.Direct Exchange 直连交换机, 根据路由键完全匹配进行路由消息队列;
2.Topic Exchange 通配符交换机, #匹配多个单词, *匹配一个单词, 用.隔开的称为一个单词:
3. Fanout Exchange 广播交换机, 投递到所有绑定的队列, 不需要规则.
4. Headers Exchange 基于消息内容中的header属性进行匹配.
依赖pom.xml
<!--RabbitMQ依赖--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
属性配置application.properties
spring.rabbitmq.host=10.10.32.140 spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=admin #开启发送确认 spring.rabbitmq.publisher-confirms=true #开启发送失败退回 spring.rabbitmq.publisher-returns=true #开启ack spring.rabbitmq.listener.simple.acknowledge-mode=manual
第一类, 直连交换机directExchage
发送者MessageSender
@Component public class MessageSender implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback { @Autowired private RabbitTemplate rabbitTemplate; public void send(String exchage, String routingKey) { String msg = "你好现在是 " + new Date(); System.out.println("send content = " + msg); this.rabbitTemplate.setMandatory(true); this.rabbitTemplate.setConfirmCallback(this); this.rabbitTemplate.setReturnCallback(this); //发送消息 this.rabbitTemplate.convertAndSend(exchage, routingKey, msg); } @Override public void confirm(CorrelationData correlationData, boolean b, String s) { } @Override public void returnedMessage(Message message, int i, String s, String s1, String s2) { } }
消费者MessageReceiver
@Component @RabbitListener(queues = "queue1") public class MessageReceiver { public void process(String msg, Channel channel, Message message) throws IOException { try { Thread.sleep(3000); System.out.println("睡眠3s"); } catch (InterruptedException e) { e.printStackTrace(); } try { //告诉服务器收到这条消息 已经被我消费了 可以在队列删掉; 否则消息服务器以为这条消息没处理掉 后续还会再发 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (IOException e) { e.printStackTrace(); //丢弃这条消息 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); System.out.println("receiver fail"); } } }
配置类RabbitConfig
@Configuration public class RabbitConfig { /** * 定义一个交换器 exchage: DirectExchage 直连交换机, 精确匹配 * @return */ @Bean public DirectExchange directExchange() { //创建一个直连交换器, 他就是在rabbitmq服务器上创建这么一个交换器 return new DirectExchange("exchage1"); } /** * 创建一个队列, 合规队列是用来存放exchage路由过来的消息 * @return */ @Bean public Queue Queuq1() { return new Queue("queue1", true); } /** * 建立起关系, 交换机 + 队列 绑定起来 * @return */ @Bean public Binding bindingDirectExchange(Queue queuq1, DirectExchange directExchange) { return BindingBuilder.bind(queuq1).to(directExchange).with("routingkey1"); } }
Controller访问
@RestController public class MessageController { @Autowired private MessageSender helloSender; /** * 正常发送消息 * @return */ @RequestMapping("/boot/send") public String send () { helloSender.send("exchage1", "routingkey1"); return "success"; } }
第二类,广播交换器 FanoutExchange, 不需要路由匹配
配置类RabbitConfig
/** * 创建一个FanoutExchange交换器, 不需要路由匹配 * @return */ @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange("exchange2"); } /** * 创建队列2 * @return */ public Queue queue2() { return new Queue("queue2", true); } /** * 创建队列3 * @return */ public Queue queue3() { return new Queue("queue3", true); } /** * 把队列2和FanoutExchage交换机绑定 * @param queue2 * @param fanoutExchange * @return */ public Binding bindingFanoutExchage(Queue queue2, FanoutExchange fanoutExchange) { return BindingBuilder.bind(queue2).to(fanoutExchange); } /** * 把队列3和FanoutExchage交换机绑定 * @param queue3 * @param fanoutExchange * @return */ public Binding bindingFanoutExchage2(Queue queue3, FanoutExchange fanoutExchange) { return BindingBuilder.bind(queue3).to(fanoutExchange); }
消息发送者, 与第一种一样
@Component public class MessageSender implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback { @Autowired private RabbitTemplate rabbitTemplate; public void send(String exchage, String routingKey) { String msg = "你好现在是 " + new Date(); System.out.println("send content = " + msg); this.rabbitTemplate.setMandatory(true); this.rabbitTemplate.setConfirmCallback(this); this.rabbitTemplate.setReturnCallback(this); //发送消息 this.rabbitTemplate.convertAndSend(exchage, routingKey, msg); } @Override public void confirm(CorrelationData correlationData, boolean b, String s) { } @Override public void returnedMessage(Message message, int i, String s, String s1, String s2) { } }
消息接收者2
@Component @RabbitListener(queues = "queue2") public class MessageReceiver { public void process(String msg, Channel channel, Message message) throws IOException { try { Thread.sleep(3000); System.out.println("睡眠3s"); } catch (InterruptedException e) { e.printStackTrace(); } try { //告诉服务器收到这条消息 已经被我消费了 可以在队列删掉; 否则消息服务器以为这条消息没处理掉 后续还会再发 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (IOException e) { e.printStackTrace(); //丢弃这条消息 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); System.out.println("receiver fail"); } } }
消息接收者3
@Component @RabbitListener(queues = "queue3") public class MessageReceiver { public void process(String msg, Channel channel, Message message) throws IOException { try { Thread.sleep(3000); System.out.println("睡眠3s"); } catch (InterruptedException e) { e.printStackTrace(); } try { //告诉服务器收到这条消息 已经被我消费了 可以在队列删掉; 否则消息服务器以为这条消息没处理掉 后续还会再发 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (IOException e) { e.printStackTrace(); //丢弃这条消息 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); System.out.println("receiver fail"); } } }
Controller访问
@RestController
public class MessageController {
@Autowired
private MessageSender helloSender;
/**
* 正常发送消息
* @return
*/
@RequestMapping("/boot/send")
public String send () {
helloSender.send("exchage2", "");
return "success";
}
}
第三类,Topic Exchange通配符交换机, #匹配多个单词, *匹配一个单词, 用.隔开的称为一个单词
配置类RabbitConfig
/** * 创建交换器 TopicExchange, 模糊匹配 * @return */ @Bean public TopicExchange topicExchange() { return new TopicExchange("exchange3"); } @Bean public Queue queue4() { return new Queue("queue4", true); } @Bean public Queue queue5() { return new Queue("queue5", true); } /** * 和queue4建立联系 * @param queue4 * @param topicExchange * @return */ public Binding bindingTopicExchange(Queue queue4, TopicExchange topicExchange) { return BindingBuilder.bind(queue4).to(topicExchange).with("#.k4.*"); } /** * 和queue5建立联系 * @param queue5 * @param topicExchange * @return */ public Binding bindingTopicExchange2(Queue queue5, TopicExchange topicExchange) { return BindingBuilder.bind(queue5).to(topicExchange).with("#.K5.*"); }
消息发送者, 与第一种一样
@Component
public class MessageSender implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(String exchage, String routingKey) {
String msg = "你好现在是 " + new Date();
System.out.println("send content = " + msg);
this.rabbitTemplate.setMandatory(true);
this.rabbitTemplate.setConfirmCallback(this);
this.rabbitTemplate.setReturnCallback(this);
//发送消息
this.rabbitTemplate.convertAndSend(exchage, routingKey, msg);
}
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
}
@Override
public void returnedMessage(Message message, int i, String s, String s1, String s2) {
}
}
消息接收者4
@Component
@RabbitListener(queues = "queue4")
public class MessageReceiver {
public void process(String msg, Channel channel, Message message) throws IOException {
try {
Thread.sleep(3000);
System.out.println("睡眠3s");
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
//告诉服务器收到这条消息 已经被我消费了 可以在队列删掉; 否则消息服务器以为这条消息没处理掉 后续还会再发
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (IOException e) {
e.printStackTrace();
//丢弃这条消息
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
System.out.println("receiver fail");
}
}
}
消息接收者5
@Component
@RabbitListener(queues = "queue5")
public class MessageReceiver {
public void process(String msg, Channel channel, Message message) throws IOException {
try {
Thread.sleep(3000);
System.out.println("睡眠3s");
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
//告诉服务器收到这条消息 已经被我消费了 可以在队列删掉; 否则消息服务器以为这条消息没处理掉 后续还会再发
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (IOException e) {
e.printStackTrace();
//丢弃这条消息
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
System.out.println("receiver fail");
}
}
}
Controller访问
@RestController public class MessageController { @Autowired private MessageSender helloSender; /** * 正常发送消息 * @return */ @RequestMapping("/boot/send") public String send () { //helloSender.send("exchage2", "xy.k4.z");
helloSender.send("exchage2", "xy.h.k4.z");
return "success"; } }
总结:
RabbitMQ应用十分广泛, 程序员必备.
原文:https://www.cnblogs.com/goujh/p/10933544.html