RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。AMQP协议更多用在企业系统内对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。
生产者,RabbitMQ,消费者:
生产者将消息放在消息中间件的交换机或者队列中,交换机对应着消息队列,消费者从消息队列中取出生产者所存放的消息。实现消息的分发。消费者与消息队列绑定。
2、安装
尽量不要安装在windows中,安装在虚拟机中,可以直接下载安装包,也可以使用docker拉取。
docker拉取:先查询是否有该镜像
docker search rabbitmq:management
有则可以拉取
docker pull rabbitmq:management
然后创建容器并发布
docker run -d --name rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:management
【 -p 15672:15672】 是控制平台docker映射到系统的对应端口
【 -p 5672:5672】 是应用程序的访问端口
3、使用说明:
首先要新建一个虚拟主机,然后给虚拟主机绑定用户,通过生产者发送消息时,要连接到server,连接到server中对应的某一个虚机主机,通过具体的用户名密码,紧接着才可以把消息发布给交换机或者消息队列中,进而消费者才能去消息队列中消费消息(消费者也需要连接到rabbitMQ中的server,以及它对应的虚拟主机),当生产者发送完消息后,它就可以进行别的工作了,它与消费者是完全解耦的,消费者只需要去消息队列中查是否有对应的消息即可,当需要再次发送消息时,是需要重新建立连接即可。
虚拟主机就类似于数据库中的库。
学习时,生产者可以使用Test环境进行测试,而消费不行,因为消费者需要一直监听队列中的信息,若使用Test,那么或许当消费岗刚消费了这个消息时,线程控制权已经失去了,那么它便没有可能去对body做处理。
所以要放在main函数中。
点对点模式“Hello World”
引入依赖
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.7.3</version> </dependency>
需要一个消息提供者,一个消息消费者和RabbitMQ
import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Consumer { /** * 点对点 只能有一个消费者 业务场景 用户注册进行短信验证时,短信验证部分可以交由消费这边的系统去完成 * @param args * @throws IOException * @throws TimeoutException */ //消费消息的代码 public static void main(String[] args) throws IOException, TimeoutException { //获取虚拟主机的连接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.235.130"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/ems"); connectionFactory.setUsername("ems"); connectionFactory.setPassword("ems"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); channel.basicConsume("hello", true, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body:"+new String(body)); } } ); } }
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import org.junit.jupiter.api.Test; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Provider { //生产消息的代码 @Test public void testSendMessage() throws IOException, TimeoutException { //创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //设置连接rabbitmq的主机 connectionFactory.setHost("192.168.235.130"); //设置端口号 connectionFactory.setPort(5672); //设置连接哪个虚拟主机 connectionFactory.setVirtualHost("/ems"); //设置访问虚拟主机的用户名密码 connectionFactory.setUsername("ems"); connectionFactory.setPassword("ems"); //获取连接对象 Connection connection = connectionFactory.newConnection(); //通过连接获取连接中的通道对象 Channel channel = connection.createChannel(); //通道绑定对应的消息队列 //参数一 队列的名称,不存在会自动创建 //参数二 用来定义队列的特性 是否持久化 true持久化(不管是否重启Rabbitmq,队列都会存在,当RabbitMQ关闭时他会把队列存在磁盘中) //参数三 是否独占队列 true独占 false可以被其他连接使用 //参数四 是否在消费完成后自动删除队列 true自动删除 false 不会自动删除 //参数五 附加参数 channel.queueDeclare("hello",false,false,false,null); //发布消息 //参数一 交换机名称 //参数二 队列名称 //参数三 传递消息的额外设置 //参数四 传递的消息 channel.basicPublish("","hello",null,"hello rabbitmq".getBytes()); //关闭通道和连接 channel.close(); connection.close(); } }
为了方便后续学习,可以将共同的代码抽取出来
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.util.Objects; public class RabbitMQConnection { //工厂是重量级的 每次都创建耗费资源 所以做一个静态的成员 只创建一次 private static ConnectionFactory connectionFactory; //具体赋值是在类加载时执行,只执行一次 static{ connectionFactory = new ConnectionFactory(); //属性的赋值一旦确定下来很少变 所以也放在static中 connectionFactory.setHost("192.168.235.130"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/ems"); connectionFactory.setUsername("ems"); connectionFactory.setPassword("ems"); } //定义提供连接对象的方法 public static Connection getConnection(){ try { return connectionFactory.newConnection(); } catch (Exception e) { e.printStackTrace(); } return null; } //关闭通道和关闭连接的方法 public static void closeConnectionAndChanel(Channel channel,Connection connection){ try { if(Objects.nonNull(channel)) {channel.close();} if (Objects.nonNull(connection)) {connection.close();} } catch (Exception e) { e.printStackTrace(); } } }
原文:https://www.cnblogs.com/jamers-rz/p/14683342.html