由于工作上需要,客户那指定要使用RabbitMQ,之前学过RocketMQ(点我直达1,点我直达2)、ActiveMQ(点我直达1,点我直达2)都没用上。项目时间又赶,今天下午先在自己的阿里云服务器上搭建好RabbitMQ(点我直达),然后去github,百度上找一大堆资料,发现跑不通,没办法,快速学习,整理一套SpringBoot整合RabbitMQ的5种模式(hello world、work queue、Publish/Subscribe、Routing、Topics)。
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.8.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.ybchen</groupId> <artifactId>springboot-rabbitmq-producer</artifactId> <version>0.0.1-SNAPSHOT</version> <name>springboot-rabbitmq-producer</name> <description>SpringBoot整合RabbitMQ-生产者</description> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.75</version> </dependency> <!--SpringBoot整合RabbitMQ--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
server.port=8888
spring.rabbitmq.host=xxx.xxx.xxx.xxx
spring.rabbitmq.port=5672
spring.rabbitmq.username=xxxx
spring.rabbitmq.password=xxxxx
package com.ybchen.producer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class SpringbootRabbitmqProducerApplication {
public static void main(String[] args) {
SpringApplication.run(SpringbootRabbitmqProducerApplication.class, args);
}
}
package com.ybchen.producer.entity;
public class Clz{
private String id;
private String name;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public String toString() {
return "Clz{" +
"id=‘" + id + ‘\‘‘ +
", name=‘" + name + ‘\‘‘ +
‘}‘;
}
}
package com.ybchen.producer.controller;
import com.ybchen.producer.entity.Clz;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
/**
* @Description:hello world模型
* @Author:chenyanbin
* @Date:2021/1/6 5:35 下午
* @Versiion:1.0
*/
@RestController
@RequestMapping("/api/v1")
public class ProducerV1Controller {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* hello world模型
* @param data
* @return
*/
@PostMapping("sendMQ")
public String sendMQ(@RequestBody Clz data) {
System.out.println("接收到参数:"+data);
//第一个参数:队列名称
//第二个参数:javaBean
rabbitTemplate.convertAndSend("hello", "hello ,rabbitmq, now is :" + data);
return "ok";
}
}
package com.ybchen.producer.controller; import com.ybchen.producer.entity.Clz; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * @Description:Work queues工作队列 * 假设一个生产者生产2条消息 * 有2个消费者,此时,会通过轮询方式,每个消费者消费一条消息 * @Author:chenyanbin * @Date:2021/1/6 9:32 下午 * @Versiion:1.0 */ @RestController @RequestMapping("/api/v2") public class ProducerV2Controller { @Autowired private RabbitTemplate rabbitTemplate; /** * 公平消费 * @param data * @return */ @PostMapping("sendMQ") public String sendMQ(@RequestBody Clz data) { System.out.println("接收到参数:"+data); //第一个参数:队列名称 //第二个参数:javaBean for (int i = 0; i < 10; i++) { rabbitTemplate.convertAndSend("work", "work模型:" + data); } return "ok"; } }
package com.ybchen.producer.controller;
import com.ybchen.producer.entity.Clz;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @Description:发布订阅模型,fanout
* @Author:chenyanbin
* @Date:2021/1/6 9:47 下午
* @Versiion:1.0
*/
@RestController
@RequestMapping("/api/v3")
public class ProducerV3Controller {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 公平消费
*
* @param data
* @return
*/
@PostMapping("sendMQ")
public String sendMQ(@RequestBody Clz data) {
System.out.println("接收到参数:" + data);
//第一个参数:队列名称
//第二个参数:javaBean
rabbitTemplate.convertAndSend("logs", "", "fanout模型:" + data);
return "ok";
}
}
package com.ybchen.producer.controller;
import com.ybchen.producer.entity.Clz;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @Description:路由模型
* @Author:chenyanbin
* @Date:2021/1/6 9:59 下午
* @Versiion:1.0
*/
@RestController
@RequestMapping("/api/v4")
public class ProducerV4Controller {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostMapping("sendMQ")
public String sendMQ(@RequestBody Clz data) {
System.out.println("接收到参数:" + data);
//第一个参数:交换机名字
//第二个参数:info相关的信息
//第三个参数:javaBean
rabbitTemplate.convertAndSend("directs", "info", "路由模型:" + data);
return "ok";
}
@PostMapping("sendMQ2")
public String sendMQ2(@RequestBody Clz data) {
System.out.println("接收到参数:" + data);
//第一个参数:交换机名字
//第二个参数:info相关的信息
//第三个参数:javaBean
rabbitTemplate.convertAndSend("directs", "error", "路由模型:" + data);
return "ok";
}
}
package com.ybchen.producer.controller;
import com.ybchen.producer.entity.Clz;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @Description:订阅模式,topic
* @Author:chenyanbin
* @Date:2021/1/6 10:22 下午
* @Versiion:1.0
*/
@RestController
@RequestMapping("/api/v5")
public class ProducerV5Controller {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostMapping("sendMQ")
public String sendMQ(@RequestBody Clz data) {
System.out.println("接收到参数:" + data);
//第一个参数:交换机名字
//第二个参数:info相关的信息
//第三个参数:javaBean
rabbitTemplate.convertAndSend("topics", "user.save", "订阅模型:" + data);
return "ok";
}
@PostMapping("sendMQ2")
public String sendMQ2(@RequestBody Clz data) {
System.out.println("接收到参数:" + data);
//第一个参数:交换机名字
//第二个参数:info相关的信息
//第三个参数:javaBean
rabbitTemplate.convertAndSend("topics", "order.save", "订阅模型:" + data);
return "ok";
}
}
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.8.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.ybchen</groupId> <artifactId>springboot-rabbitmq-consumer</artifactId> <version>0.0.1-SNAPSHOT</version> <name>springboot-rabbitmq-consumer</name> <description>SpringBoot整合RabbitMQ-消费者</description> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.75</version> </dependency> <!--SpringBoot整合RabbitMQ--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
server.port=9999
spring.rabbitmq.host=xxx.xxx.xxx.xxx
spring.rabbitmq.port=5672
spring.rabbitmq.username=xxx
spring.rabbitmq.password=xxx
package com.ybchen.consumer; import org.springframework.amqp.rabbit.annotation.EnableRabbit; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication @EnableRabbit public class SpringbootRabbitmqConsumerApplication { public static void main(String[] args) { SpringApplication.run(SpringbootRabbitmqConsumerApplication.class, args); } }
package com.ybchen.consumer.controller; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * @Description:hello world模型 * @Author:chenyanbin * @Date:2021/1/6 6:03 下午 * @Versiion:1.0 */ //被spring扫描到 @Component //@Queue:没有的话,创建队列 //declare:不持久化,默认:true //autoDelete:是否自动删除 //@Queue(value = "hello",declare = "false",autoDelete = "true") @RabbitListener(queuesToDeclare = @Queue(value = "hello",declare = "false",autoDelete = "true")) public class ConsumerV1 { //监听mq里的队列,接收那个队列里的消息,没有队列的话,去声明一个队列 /** * @RabbitHandler:从队列中拿到消息的回调方法 * @param message */ @RabbitHandler public void helloWorld(String message) { System.out.println(message); } }
package com.ybchen.consumer.controller; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * @Description:Work queues工作队列 * @Author:chenyanbin * @Date:2021/1/6 9:33 下午 * @Versiion:1.0 */ @Component public class ConsumerV2 { //加在方法上,代表会监听这个方法的回调 //消费者-1 @RabbitListener(queuesToDeclare = @Queue("work")) public void receive_1(String message) { System.out.println("work模型 receive_1====>" + message); } //加在方法上,代表会监听这个方法的回调 //消费者-2 @RabbitListener(queuesToDeclare = @Queue("work")) public void receive_2(String message) { System.out.println("work模型 receive_2====>" + message); } }
package com.ybchen.consumer.controller; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * @Description:发布订阅模型,fanout * @Author:chenyanbin * @Date:2021/1/6 9:47 下午 * @Versiion:1.0 */ @Component public class ConsumerV3 { //加在方法上,代表会监听这个方法的回调 //消费者-1 @RabbitListener(bindings = { @QueueBinding( value = @Queue, //创建临时队列 exchange = //交换机 @Exchange( value = "logs",type = "fanout" //交换机名字,类型 ) ) }) public void receive_1(String message) { System.out.println("发布订阅模型 receive_1====>" + message); } //加在方法上,代表会监听这个方法的回调 //消费者-1 @RabbitListener(bindings = { @QueueBinding( value = @Queue, //创建临时队列 exchange = //交换机 @Exchange( value = "logs",type = "fanout" //交换机名字,类型 ) ) }) public void receive_2(String message) { System.out.println("发布订阅模型 receive_2====>" + message); } }
package com.ybchen.consumer.controller; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * @Description:路由模型 * @Author:chenyanbin * @Date:2021/1/6 10:00 下午 * @Versiion:1.0 */ @Component public class ConsumerV4 { /** * 会接受info、error、warn信息 * @param message */ @RabbitListener( bindings = { @QueueBinding( value = @Queue, //创建临时队列 exchange = @Exchange( value = "directs", type = "direct" //交换机名称和类型 ), key = {"info", "error", "warn"} //接收路由的信心 ) } ) public void receive_1(String message) { System.out.println("路由-1:" + message); } /** * 只会接受error的路由信息 * @param message */ @RabbitListener( bindings = { @QueueBinding( value = @Queue, //创建临时队列 exchange = @Exchange( value = "directs", type = "direct" //交换机名称和类型 ), key = {"error"} //接收路由的信心 ) } ) public void receive_2(String message) { System.out.println("路由-2:" + message); } }
package com.ybchen.consumer.controller; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * @Description:订阅模式,topic * @Author:chenyanbin * @Date:2021/1/6 10:25 下午 * @Versiion:1.0 */ @Component public class ConsumerV5 { @RabbitListener( bindings = @QueueBinding( value = @Queue, //创建临时队列 exchange = @Exchange( //交换机 type = "topic", name = "topics" //类型和名称 ), key = {"user.save", "user.*"} ) ) public void receive_1(String message) { System.out.println("订阅模型 receive_1====>" + message); } @RabbitListener( bindings = @QueueBinding( value = @Queue, //创建临时队列 exchange = @Exchange( //交换机 type = "topic", name = "topics" //类型和名称 ), key = {"order.#", "produce.#","user.*"} //可以使用正则方式 ) ) public void receive_2(String message) { System.out.println("订阅模型 receive_2====>" + message); } }
链接: https://pan.baidu.com/s/1SnutjcAJlQG9Io1Z_F9kJA 密码: itn4
满打满算RabbitMQ算是告一段落,今天只是停留在会简单的使用,具体配置项还不是很了解,各种调优啥的,未来还是要在认真的系统学习一遍滴,今天先到这,洗洗睡啦,ヾ(ToT)Bye~Bye~
原文:https://www.cnblogs.com/chenyanbin/p/14244013.html