首页 > 编程语言 > 详细

RabbitMQ整合Spring

时间:2020-07-09 15:47:43      阅读:66      评论:0      收藏:0      [点我收藏+]

之前我们使用 RabbitMQ 原生的 API 方法来实现MQ的使用,Spring 也提供了 RabbitMQ 的集成,让我们更方便的使用MQ,让我们来学习下吧。

Spring AMQP 是基于 Spring 框架的 AMQP 消息解决方案,提供模板化的发送和接收消息的抽象层,提供基于消息驱动的 POJO 的消息监听等,很大方便我们使用 RabbitMQ 程序的相关开发。

一、RabbitAdmin 管理组件

1.1 准备工作:

  1. 添加 Spring AMQP 依赖
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>2.3.1.RELEASE</version>
</dependency>
  1. 声明 Bean 对象
@Configuration
public class RabbitMQConfig {
    /**
     * 注入连接工厂对象
     * @return
     */
    @Bean
    public ConnectionFactory connectionFactory(){
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost("111.231.83.100");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        return connectionFactory;
    }
    
    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        // 必须显式设置为 True ,否则 Spring 容器不会加载
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin;
    }
}

1.2 Exchange 操作

相关方法:

方法定义 作用
void declareExchange(Exchange exchange) 声明交换机
boolean deleteExchange(String exchange) 删除交换机

添加交换机

@SpringBootTest
public class ExchangeAddTest {

    @Autowired
    private RabbitAdmin rabbitAdmin;

    @Test
    public void shouldAddExchangeSuccess() {
        rabbitAdmin.declareExchange(new DirectExchange("admin.direct", true, false));
        rabbitAdmin.declareExchange(new TopicExchange("admin.topic", false, true));
        rabbitAdmin.declareExchange(new FanoutExchange("admin.fanout", false, false));
    }
}    

技术分享图片

删除交换机

@SpringBootTest
public class ExchangeAddTest {

    @Autowired
    private RabbitAdmin rabbitAdmin;
    
    @Test
    public void shouldDeleteExchangeSuccess() {
        boolean result = rabbitAdmin.deleteExchange("admin.direct");
        Assert.assertTrue(result);
    }
}    

技术分享图片

1.3 Queue 操作

方法定义 作用
Queue declareQueue() 声明默认队列
String declareQueue(Queue queue) 申明给定的队列
boolean deleteQueue(String queueName) 删除队列
void deleteQueue(String queueName, boolean unused, boolean empty) 删除队列
void purgeQueue(String queueName, boolean noWait) 清除队列信息,noWait = true 时异步执行
int purgeQueue(String queueName) 清除队列信息
Properties getQueueProperties(String queueName) 获取指定队列的属性

声明队列

@SpringBootTest
public class QueueTest {

    @Autowired
    private RabbitAdmin rabbitAdmin;

    @Test
    public void shouldAddQueueSuccess() {
        // 创建默认队列
        Queue defaultQueue = rabbitAdmin.declareQueue();
        Assert.assertNotNull(defaultQueue);
        Assert.assertEquals(false,defaultQueue.isDurable());

        // 创建指定名称和是否持久化属性的队列
        String  queueName =  rabbitAdmin.declareQueue(new Queue("orderQueue",true));
        Assert.assertNotNull(queueName);
        Assert.assertEquals("orderQueue",queueName);
    }
}    

技术分享图片

注: 默认的队列因为设置 exclusive = true ,导致在其连接断开的时候自动删除,所以图中看不到。

删除队列

@SpringBootTest
public class QueueTest {

    @Autowired
    private RabbitAdmin rabbitAdmin;
   
    @Test
    public void shouldDeleteQueueSuccess() {
        boolean result = rabbitAdmin.deleteQueue("orderQueue");
        Assert.assertTrue(result);
    }   
}    

技术分享图片

1.4 Binding 绑定

方法定义 作用
void declareBinding(Binding binding) 声明队列与交换机的绑定
void removeBinding(Binding binding) 删除队列与交换机的绑定

声明队列与交换机的绑定

@SpringBootTest
public class BindingTest {

    @Autowired
    private RabbitAdmin rabbitAdmin;

    @Test
    public void shouldBindingSuccess() {
        // 交换机名称
        String exchange = "admin.topic";
        // 队列名称
        String queueName = "orderQueue";
        // 1.创建绑定关系对象
        Binding binding =
                BindingBuilder
                        // 创建队列
                        .bind(new Queue(queueName, true))
                        // 创建交换机
                        .to(new TopicExchange(exchange, true, false))
                        // 指定路由 Key
                        .with("order#");
        // 2.进行绑定
        rabbitAdmin.declareBinding(binding);
    }

}

技术分享图片

删除队列与交换机的绑定

@SpringBootTest
public class BindingTest {

    @Autowired
    private RabbitAdmin rabbitAdmin;
	
    @Test
    public void shouldUnBindingSuccess() {
        // 交换机名称
        String exchange = "admin.topic";
        // 队列名称
        String queueName = "orderQueue";
        Binding binding =
                new Binding(queueName, Binding.DestinationType.QUEUE, exchange, "order#", null);
        rabbitAdmin.removeBinding(binding);
    }
}    

技术分享图片

1.5 bean 注入

除了上面的通过代码显式申明交换机、队列、路由 之外,还可以通过 Bena 注入的形式申明。

@Configuration
public class RabbitMQConfig {

    /**
     * 注入连接工厂对象
     *
     * @return
     */
    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost("111.231.83.100");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        return connectionFactory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        // 必须显式设置为 True ,否则 Spring 容器不会加载
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin;
    }

    @Bean
    public TopicExchange beanExchange() {
        return new TopicExchange("beanExchange", true, false);
    }

    @Bean
    public Queue beanQueue() {
        return new Queue("beanQueue", true);
    }

    @Bean
    public Binding beanBinding(TopicExchange beanExchange, Queue beanQueue) {
        return BindingBuilder
                // 创建队列
                .bind(beanQueue)
                // 创建交换机
                .to(beanExchange)
                // 指定路由 Key
                .with("bean#");
    }
}
@SpringBootTest
public class BeanInjectionBindingTest {

    @Autowired
    private RabbitAdmin rabbitAdmin;
    @Autowired
    private Binding beanBinding;

    @Test
    public void shouldBindingSuccess() {
        rabbitAdmin.declareBinding(beanBinding);
    }

}

技术分享图片

二、RabbitTemplate 模板组件

如果你看过 RabbitAdmin 的源码,可以看到里面使用到了一个叫做 RabbitTemplate 的对象,它就是 Spring 提供的消息模板,封装了 RabbitMQ 核心 API 的一系列方法,而 RabbitAdmin 是在它之上的另一层封装。

2.1 常用方法

方法定义 作用
void send(Message message) 发送消息
void convertAndSend(Object object) 将 Java 对象包装成 Message 对象并发送 ,Java 对象需要实现 Serializable 序列化接口
Message receive(String queueName) 接收消息
receiveAndConvert(String queueName) 接收消息并将 Message 转换成 Java 对

2.2 发送消息

@SpringBootTest
public class MessageTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void shouldSendMessageSuccess(){
        // 创建消息属性对象
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.getHeaders().put("desc", "信息描述..");
        messageProperties.getHeaders().put("type", "自定义消息类型..");
        // 创建消息对象
        Message message = new Message("Hello RabbitMQ".getBytes(), messageProperties);
        // 发送消息
        rabbitTemplate.send("beanExchange","bean#",message);

        // 发送消息时额外增加属性
        Message newMessage = new Message("newMessage".getBytes(), messageProperties);
        rabbitTemplate.convertAndSend("beanExchange", "bean#", newMessage, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().getHeaders().put("desc", "额外修改的信息描述");
                message.getMessageProperties().getHeaders().put("attr", "额外新加的属性");
                return message;
            }
        });
    }
}    

技术分享图片

2.3 手动接收消息

@SpringBootTest
public class MessageTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;  
	
    @Test
    public void shouldConsumeMessageSuccess() {
        Message msg = rabbitTemplate.receive("beanQueue", 2000l);
        System.out.println("消息内容:" + new String(msg.getBody()));
        final Map<String, Object> headers = msg.getMessageProperties().getHeaders();
        System.out.println("=======消息头属性=======");
        for (String key : headers.keySet()) {
            System.out.println("key =" + key + " ; value =" + headers.get(key));
        }
    }
}    

执行方法,观察控制台输出:

消息内容:Hello RabbitMQ
=======消息头属性=======
key =type ; value =自定义消息类型..
key =desc ; value =信息描述..

再次执行方法,观察控制台输出:

消息内容:newMessage
=======消息头属性=======
key =type ; value =自定义消息类型..
key =attr ; value =额外新加的属性
key =desc ; value =额外修改的信息描述

2.4 消息监听容器

在实际项目中我们不可能采用手动接收消息的形式来消费消息,这个时候 Spring 就为我们提供了一个消息监听容器 SimpleMessageListenerContainer

它的功能如下:

* 监听多个队列
* 设置消费者消费数量
* 设置消息确认和自动确认模式
* 是否重回队列
* 异常捕获 handel 函数
* 设置消费者属性
* 设置具体的监听器和消息转换器

SimpleMessageListenerContainer 可以在运行过程中动态修改属性,如修改消费者消费数量大小、接收消息的模式等

@Configuration
public class RabbitMQConfig {
    ......
	@Bean
    public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory,Queue beanQueue) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        // 设置监听队列,可以有多个
        container.setQueues(beanQueue);
        // 设置并发消费者数量
        container.setConcurrentConsumers(1);
        // 设置最大并发消费者数量
        container.setMaxConcurrentConsumers(5);
        // 设置是否重回队列
        container.setDefaultRequeueRejected(false);
        // 设置签收模式,这里为了演示使用自动签收,实际项目中需要使用手动签收 AcknowledgeMode.MANUAL
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);
        // 设置消费端标签策略
        container.setConsumerTagStrategy(new ConsumerTagStrategy() {
            @Override
            public String createConsumerTag(String queue) {
                return queue + "_" + UUID.randomUUID().toString();
            }
        });
        // 设置消息监听
        container.setMessageListener(new ChannelAwareMessageListener() {
            @Override
            public void onMessage(Message message, Channel channel) throws Exception {
                String msg = new String(message.getBody());
                System.err.println("消费端监听:" + msg);
            }
        });
        return container;
    }
}

启动应用后,执行发送消息测试方法 shouldSendMessageSuccess ,观察控制台输出:

消费端监听:Hello RabbitMQ

证明消费端监听并消费成功。

2.5 消息监听适配器

除了直接使用 ChannelAwareMessageListener 实现消息事件监听外,还可以通过消息监听适配器(MessageListenerAdapter),通过反射将消息处理委托给目标监听器的处理方法,并进行灵活的消息类型转换。允许监听器方法对消息内容类型进行操作,完全独立于 Rabbit API

实际上就是相当于自己实现 ChannelAwareMessageListener 功能。

  1. 新建 MessageDelegate
public class MessageDelegate {
    public void consumeMessage(byte[] messageBody) {
        System.err.println("默认方法, 消息内容:" + new String(messageBody));
    }
}
  1. 替换原 ChannelAwareMessageListener 事件
//   container.setMessageListener(new ChannelAwareMessageListener() {
//       @Override
//       public void onMessage(Message message, Channel channel) throws Exception {
//           String msg = new String(message.getBody());
//           System.err.println("消费端监听:" + msg);
//       }
//   });
// 设置消息监听
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
// adapter 默认执行方法是 handleMessage,这里我们设置自定义方法名
adapter.setDefaultListenerMethod("consumeMessage");
container.setMessageListener(adapter);
  1. 启动应用后,执行发送消息测试方法 shouldSendMessageSuccess ,观察控制台输出:
默认方法, 消息内容:Hello RabbitMQ

证明消费端监听并消费成功。

我们还可以将队列名和方法做绑定,实现转发功能:

MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
// adapter 默认执行方法是 handleMessage,这里我们设置自定义方法名
// adapter.setDefaultListenerMethod("consumeMessage");
Map<String, String> queueOrTagToMethodName = new HashMap<>();
queueOrTagToMethodName.put(beanQueue.getName(), "consumeMessage");
adapter.setQueueOrTagToMethodName(queueOrTagToMethodName);
container.setMessageListener(adapter);    

2.6 消息转换器

我们现在发送和接受消息的类型都是二进制形式传输,我们可以通过 MessageConverter 进行转换。

  1. 新建 TextMessageConverter
public class TextMessageConverter implements MessageConverter {
    @Override
    public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
        return new Message(object.toString().getBytes(), messageProperties);
    }

    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
        String contentType = message.getMessageProperties().getContentType();
        if (null != contentType && contentType.contains("text")) {
            return new String(message.getBody());
        }
        return message.getBody();
    }
}
  1. adapter 设置转换类
adapter.setMessageConverter(new TextMessageConverter());
  1. MessageDelegate 新增 字符串参数的方法
public class MessageDelegate {
	......
    public void consumeMessage(String messageBody){
        System.err.println("字符串类型, 消息内容:" + new String(messageBody));
    }
}
  1. 新增发送文本消息测试方法
@Test
public void shouldSendTextMessageSuccess() {
    // 创建消息属性对象
    MessageProperties messageProperties = new MessageProperties();
    // 通过设置属性,让消费端知道要将消息内容转换成文本类型
    messageProperties.setContentType("text");
    // 创建消息对象
    Message message = new Message("字符串消息".getBytes(), messageProperties);
    // 发送消息
    rabbitTemplate.send("beanExchange", "bean#", message);
}

启动应用后,执行发送消息测试方法,观察控制台输出:

字符串类型, 消息内容:字符串消息

RabbitMQ整合Spring

原文:https://www.cnblogs.com/markLogZhu/p/13273809.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!