首页 > 编程语言 > 详细

SpringBoot整合RabbitMQ,附案例源码

时间:2021-01-06 23:43:46      阅读:28      评论:0      收藏:0      [点我收藏+]

导读

  由于工作上需要,客户那指定要使用RabbitMQ,之前学过RocketMQ(点我直达1点我直达2)、ActiveMQ(点我直达1点我直达2)都没用上。项目时间又赶,今天下午先在自己的阿里云服务器上搭建好RabbitMQ(点我直达),然后去github,百度上找一大堆资料,发现跑不通,没办法,快速学习,整理一套SpringBoot整合RabbitMQ的5种模式(hello worldwork queuePublish/SubscribeRoutingTopics)。

项目结构

技术分享图片

 

项目演示

技术分享图片

阿里云服务器搭建RabbitMQ

点我直达

生产者项目

项目结构

技术分享图片

 

生产者代码

技术分享图片
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.8.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.ybchen</groupId>
    <artifactId>springboot-rabbitmq-producer</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>springboot-rabbitmq-producer</name>
    <description>SpringBoot整合RabbitMQ-生产者</description>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.75</version>
        </dependency>
        <!--SpringBoot整合RabbitMQ-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>
pom.xml
技术分享图片
server.port=8888
spring.rabbitmq.host=xxx.xxx.xxx.xxx
spring.rabbitmq.port=5672
spring.rabbitmq.username=xxxx
spring.rabbitmq.password=xxxxx
application.properties
技术分享图片
package com.ybchen.producer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class SpringbootRabbitmqProducerApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringbootRabbitmqProducerApplication.class, args);
    }

}
SpringbootRabbitmqProducerApplication.java
技术分享图片
package com.ybchen.producer.entity;

public class Clz{
    private String id;
    private String name;

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    @Override
    public String toString() {
        return "Clz{" +
                "id=‘" + id + ‘\‘‘ +
                ", name=‘" + name + ‘\‘‘ +
                ‘}‘;
    }
}
Clz.java
技术分享图片
package com.ybchen.producer.controller;

import com.ybchen.producer.entity.Clz;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

/**
 * @Description:hello world模型
 * @Author:chenyanbin
 * @Date:2021/1/6 5:35 下午
 * @Versiion:1.0
 */
@RestController
@RequestMapping("/api/v1")
public class ProducerV1Controller {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * hello world模型
     * @param data
     * @return
     */
    @PostMapping("sendMQ")
    public String sendMQ(@RequestBody Clz data) {
        System.out.println("接收到参数:"+data);
        //第一个参数:队列名称
        //第二个参数:javaBean
        rabbitTemplate.convertAndSend("hello", "hello ,rabbitmq, now is :" + data);
        return "ok";
    }

}
ProducerV1Controller.java
技术分享图片
package com.ybchen.producer.controller;

import com.ybchen.producer.entity.Clz;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @Description:Work queues工作队列
 *  假设一个生产者生产2条消息
 *  有2个消费者,此时,会通过轮询方式,每个消费者消费一条消息
 * @Author:chenyanbin
 * @Date:2021/1/6 9:32 下午
 * @Versiion:1.0
 */
@RestController
@RequestMapping("/api/v2")
public class ProducerV2Controller {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 公平消费
     * @param data
     * @return
     */
    @PostMapping("sendMQ")
    public String sendMQ(@RequestBody Clz data) {
        System.out.println("接收到参数:"+data);
        //第一个参数:队列名称
        //第二个参数:javaBean
        for (int i = 0; i < 10; i++) {
            rabbitTemplate.convertAndSend("work", "work模型:" + data);
        }
        return "ok";
    }
}
ProducerV2Controller.java
技术分享图片
package com.ybchen.producer.controller;

import com.ybchen.producer.entity.Clz;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @Description:发布订阅模型,fanout
 * @Author:chenyanbin
 * @Date:2021/1/6 9:47 下午
 * @Versiion:1.0
 */
@RestController
@RequestMapping("/api/v3")
public class ProducerV3Controller {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 公平消费
     *
     * @param data
     * @return
     */
    @PostMapping("sendMQ")
    public String sendMQ(@RequestBody Clz data) {
        System.out.println("接收到参数:" + data);
        //第一个参数:队列名称
        //第二个参数:javaBean
        rabbitTemplate.convertAndSend("logs", "", "fanout模型:" + data);
        return "ok";
    }
}
ProducerV3Controller.java
技术分享图片
package com.ybchen.producer.controller;

import com.ybchen.producer.entity.Clz;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @Description:路由模型
 * @Author:chenyanbin
 * @Date:2021/1/6 9:59 下午
 * @Versiion:1.0
 */
@RestController
@RequestMapping("/api/v4")
public class ProducerV4Controller {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostMapping("sendMQ")
    public String sendMQ(@RequestBody Clz data) {
        System.out.println("接收到参数:" + data);
        //第一个参数:交换机名字
        //第二个参数:info相关的信息
        //第三个参数:javaBean
        rabbitTemplate.convertAndSend("directs", "info", "路由模型:" + data);
        return "ok";
    }

    @PostMapping("sendMQ2")
    public String sendMQ2(@RequestBody Clz data) {
        System.out.println("接收到参数:" + data);
        //第一个参数:交换机名字
        //第二个参数:info相关的信息
        //第三个参数:javaBean
        rabbitTemplate.convertAndSend("directs", "error", "路由模型:" + data);
        return "ok";
    }
}
ProducerV4Controller.java
技术分享图片
package com.ybchen.producer.controller;

import com.ybchen.producer.entity.Clz;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @Description:订阅模式,topic
 * @Author:chenyanbin
 * @Date:2021/1/6 10:22 下午
 * @Versiion:1.0
 */
@RestController
@RequestMapping("/api/v5")
public class ProducerV5Controller {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostMapping("sendMQ")
    public String sendMQ(@RequestBody Clz data) {
        System.out.println("接收到参数:" + data);
        //第一个参数:交换机名字
        //第二个参数:info相关的信息
        //第三个参数:javaBean
        rabbitTemplate.convertAndSend("topics", "user.save", "订阅模型:" + data);
        return "ok";
    }
    @PostMapping("sendMQ2")
    public String sendMQ2(@RequestBody Clz data) {
        System.out.println("接收到参数:" + data);
        //第一个参数:交换机名字
        //第二个参数:info相关的信息
        //第三个参数:javaBean
        rabbitTemplate.convertAndSend("topics", "order.save", "订阅模型:" + data);
        return "ok";
    }
}
ProducerV5Controller.java

消费者项目

项目结构

技术分享图片

 

消费者代码

技术分享图片
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.8.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.ybchen</groupId>
    <artifactId>springboot-rabbitmq-consumer</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>springboot-rabbitmq-consumer</name>
    <description>SpringBoot整合RabbitMQ-消费者</description>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.75</version>
        </dependency>
        <!--SpringBoot整合RabbitMQ-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>
pom.xml
技术分享图片
server.port=9999
spring.rabbitmq.host=xxx.xxx.xxx.xxx
spring.rabbitmq.port=5672
spring.rabbitmq.username=xxx
spring.rabbitmq.password=xxx
View Code
技术分享图片
package com.ybchen.consumer;

import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
@EnableRabbit
public class SpringbootRabbitmqConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringbootRabbitmqConsumerApplication.class, args);
    }

}
SpringbootRabbitmqConsumerApplication.java
技术分享图片
package com.ybchen.consumer.controller;

import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @Description:hello world模型
 * @Author:chenyanbin
 * @Date:2021/1/6 6:03 下午
 * @Versiion:1.0
 */
//被spring扫描到
@Component
//@Queue:没有的话,创建队列
//declare:不持久化,默认:true
//autoDelete:是否自动删除
//@Queue(value = "hello",declare = "false",autoDelete = "true")
@RabbitListener(queuesToDeclare = @Queue(value = "hello",declare = "false",autoDelete = "true"))
public class ConsumerV1 {
    //监听mq里的队列,接收那个队列里的消息,没有队列的话,去声明一个队列

    /**
     * @RabbitHandler:从队列中拿到消息的回调方法
     * @param message
     */
    @RabbitHandler
    public void helloWorld(String message) {
        System.out.println(message);
    }
}
ConsumerV1.java
技术分享图片
package com.ybchen.consumer.controller;

import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @Description:Work queues工作队列
 * @Author:chenyanbin
 * @Date:2021/1/6 9:33 下午
 * @Versiion:1.0
 */
@Component
public class ConsumerV2 {
    //加在方法上,代表会监听这个方法的回调
    //消费者-1
    @RabbitListener(queuesToDeclare = @Queue("work"))
    public void receive_1(String message) {
        System.out.println("work模型 receive_1====>" + message);
    }

    //加在方法上,代表会监听这个方法的回调
    //消费者-2
    @RabbitListener(queuesToDeclare = @Queue("work"))
    public void receive_2(String message) {
        System.out.println("work模型 receive_2====>" + message);
    }
}
ConsumerV2.java
技术分享图片
package com.ybchen.consumer.controller;

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @Description:发布订阅模型,fanout
 * @Author:chenyanbin
 * @Date:2021/1/6 9:47 下午
 * @Versiion:1.0
 */
@Component
public class ConsumerV3 {
    //加在方法上,代表会监听这个方法的回调
    //消费者-1
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue,  //创建临时队列
                    exchange = //交换机
                        @Exchange(
                                value = "logs",type = "fanout" //交换机名字,类型
                        )
            )
    })
    public void receive_1(String message) {
        System.out.println("发布订阅模型 receive_1====>" + message);
    }
    //加在方法上,代表会监听这个方法的回调
    //消费者-1
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue,  //创建临时队列
                    exchange = //交换机
                    @Exchange(
                            value = "logs",type = "fanout" //交换机名字,类型
                    )
            )
    })
    public void receive_2(String message) {
        System.out.println("发布订阅模型 receive_2====>" + message);
    }
}
ConsumerV3.java
技术分享图片
package com.ybchen.consumer.controller;

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @Description:路由模型
 * @Author:chenyanbin
 * @Date:2021/1/6 10:00 下午
 * @Versiion:1.0
 */
@Component
public class ConsumerV4 {
    /**
     * 会接受info、error、warn信息
     * @param message
     */
    @RabbitListener(
            bindings = {
                    @QueueBinding(
                            value = @Queue, //创建临时队列
                            exchange = @Exchange(
                                    value = "directs", type = "direct" //交换机名称和类型
                            ),
                            key = {"info", "error", "warn"} //接收路由的信心
                    )
            }
    )
    public void receive_1(String message) {
        System.out.println("路由-1:" + message);
    }

    /**
     * 只会接受error的路由信息
     * @param message
     */
    @RabbitListener(
            bindings = {
                    @QueueBinding(
                            value = @Queue, //创建临时队列
                            exchange = @Exchange(
                                    value = "directs", type = "direct" //交换机名称和类型
                            ),
                            key = {"error"} //接收路由的信心
                    )
            }
    )
    public void receive_2(String message) {
        System.out.println("路由-2:" + message);
    }
}
ConsumerV4.java
技术分享图片
package com.ybchen.consumer.controller;

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @Description:订阅模式,topic
 * @Author:chenyanbin
 * @Date:2021/1/6 10:25 下午
 * @Versiion:1.0
 */
@Component
public class ConsumerV5 {

    @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue, //创建临时队列
                    exchange =
                    @Exchange( //交换机
                            type = "topic", name = "topics" //类型和名称
                    ),
                    key = {"user.save", "user.*"}
            )
    )
    public void receive_1(String message) {
        System.out.println("订阅模型 receive_1====>" + message);
    }

    @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue, //创建临时队列
                    exchange =
                    @Exchange( //交换机
                            type = "topic", name = "topics" //类型和名称
                    ),
                    key = {"order.#", "produce.#","user.*"} //可以使用正则方式
            )
    )
    public void receive_2(String message) {
        System.out.println("订阅模型 receive_2====>" + message);
    }
}
ConsumerV5.java

案例源码下载

链接: https://pan.baidu.com/s/1SnutjcAJlQG9Io1Z_F9kJA  密码: itn4

尾声

  满打满算RabbitMQ算是告一段落,今天只是停留在会简单的使用,具体配置项还不是很了解,各种调优啥的,未来还是要在认真的系统学习一遍滴,今天先到这,洗洗睡啦,ヾ(ToT)Bye~Bye~

 

SpringBoot整合RabbitMQ,附案例源码

原文:https://www.cnblogs.com/chenyanbin/p/14244013.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!