首页 > 编程语言 > 详细

SpringBoot(5) ------>整合RabbitMQ

时间:2021-01-16 21:51:27      阅读:19      评论:0      收藏:0      [点我收藏+]

1、向pom文件添加依赖

       <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

2、application.yml配置

#rabbitmq配置
spring:
  rabbitmq:
    host: 192.168.0.18
    username: guest
    password: guest   #默认guest
    virtual-host: /   #默认/ 可以不写

3、rabbit配置类

 1)启动时创建交换机、消息队列与绑定规则配置

@Configuration
public class RabbitConfig {

    @Bean
    public Queue empQueue() {
        return new Queue("EmpQueue",true);
    }
    @Bean
    public Queue leaderQueue() {
        return new Queue("LeaderQueue",true);
    }

    @Bean
    DirectExchange userDirectExchange() {
        return new DirectExchange("userDirectExchange",true,false);
    }

    @Bean
    FanoutExchange userFanoutExchange() {
        return new FanoutExchange("userFanoutExchange",true,false);
    }

    @Bean
    TopicExchange userTopicExchange() {
        return new TopicExchange("userTopicExchange",true,false);
    }

    @Bean
    Binding empQueueToUserDirect() {
        return BindingBuilder.bind(empQueue()).to(userDirectExchange()).with("emp.queue");
    }
    @Bean
    Binding leaderQueueToUserDirect() {
        return BindingBuilder.bind(empQueue()).to(userDirectExchange()).with("leader.queue");
    }

    @Bean
    Binding empQueueToUserFanout() {
        return BindingBuilder.bind(empQueue()).to(userFanoutExchange());
    }
    @Bean
    Binding leaderQueueToUserFanout() {
        return BindingBuilder.bind(leaderQueue()).to(userFanoutExchange());
    }

    @Bean
    Binding empQueueToUserTopic() {
        return BindingBuilder.bind(empQueue()).to(userTopicExchange()).with("emp.#");
    }

    @Bean
    Binding leaderQueueToUserTopic() {
        return BindingBuilder.bind(leaderQueue()).to(userTopicExchange()).with("leader.#");
    }

}

 2)发送对象序列化配置

 在配置类中添加序列化机制

    /**
     * 将消息体转换为json
     * @return Jackson2JsonMessageConverter
     */
    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }

4、接收消息

  添加@RabbitListener接收队列消息

package com.donleo.amqp.service;

import com.donleo.amqp.model.Book;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

/**
 * @author liangd
 * date 2020-12-12 11:19
 * code
 */
@Service
public class BookService {
    /**
     * 获取对象
     */
    @RabbitListener(queues = "donleo")
    public void testBook(Book book){
        System.out.println("接收消息"+book);
    }

    /**
     * 获取message消息
     */
    @RabbitListener(queues = "donleo.emps")
    public void testBoot01(Message message){
        byte[] body = message.getBody();
        MessageProperties properties = message.getMessageProperties();
        System.out.println(body);
        System.out.println(properties);
    }
}

5、测试

package com.donleo.amqp;

import com.donleo.amqp.model.Book;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.util.HashMap;

@SpringBootTest
class AmqpApplicationTests {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private AmqpAdmin amqpAdmin;

    /**
     * 通过AmqpAdmin创建交换机等
     */
    @Test
    void amqpAdmin(){
        //FanoutExchange  TopicExchange
//        amqpAdmin.declareExchange(new DirectExchange("amqpadmin.direct"));
//        amqpAdmin.declareQueue(new Queue("amqpadmin.product"));
        //绑定规则
        amqpAdmin.declareBinding(new Binding(
                "amqpadmin.product",
                Binding.DestinationType.QUEUE,"amqpadmin.direct",
                "amqpadmin.product",null));
    }

    /**
     * 单播(点对点)
     */
    @Test
    void sendDirect() {
        //需要自定义消息体和消息内容
//        rabbitTemplate.send(exchange,routngKey,message);
        //一般使用这个,只需传入要发送的对象,自动序列化
        rabbitTemplate.convertAndSend("exchange.direct","donleo","点对点式");
    }

    /**
     * 接收消息
     */
    @Test
    void receiveDirect(){
        //receiveAndConvert 自动转换,receive接收有消息头等其它数据
        Object object = rabbitTemplate.receiveAndConvert("donleo");
        System.out.println(object);
    }

    /**
     * 广播
     */
    @Test
    void sendFanout(){
//        HashMap<String, Object> map = new HashMap<>();
//        map.put("username","zhangs");
//        map.put("nickname","张三");
//        rabbitTemplate.convertAndSend("exchange.fanout","",map);
        rabbitTemplate.convertAndSend("exchange.fanout","",new Book(1,"三国演义","罗贯中"));
    }

    /**
     * 接收消息
     */
    @Test
    void receiveFanout(){
        Object o = rabbitTemplate.receiveAndConvert("donleo.news");
        System.out.println(o);
    }
}

 

SpringBoot(5) ------>整合RabbitMQ

原文:https://www.cnblogs.com/donleo123/p/14287045.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!