MQ全称为Message Queue,消息队列是应用程序和应用程序之间的通信方法。
为什么使用MQ
在项目中,可将一些无需即时返回且耗时的操作提取出来,进行异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量。
开发中消息队列通常有如下应用场景:
1、任务异步处理
将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理。提高了应用程序的响应时间。
2、应用程序解耦合
MQ相当于一个中介,生产方通过MQ与消费方交互,它将应用程序进行解耦合。
MQ是消息通信的模型;实现MQ的大致有两种主流方式:AMQP、JMS。
JMS是定义了统一的接口,来对消息操作进行统一;AMQP是通过规定协议来统一数据交互的格式
JMS限定了必须使用Java语言;AMQP只是协议,不规定实现方式,因此是跨语言的。
JMS规定了两种消息模式;而AMQP的消息模式更加丰富
市场上常见的消息队列有如下:
目前市面上成熟主流的MQ有Kafka 、RocketMQ、RabbitMQ
RabbitMQ:
使用Erlang编写的一个开源的消息队列,本身支持很多的协议:AMQP,XMPP, SMTP,STOMP,也正是如此,使的它变的非常重量级,更适合于企业级的开发。同时实现了Broker架构,核心思想是生产者不会将消息直接发送给队列,消息在发送给客户端时先在中心队列排队。对路由(Routing),负载均衡(Load balance)、数据持久化都有很好的支持。多用于进行企业级的ESB整合。
RabbitMQ提供了6种模式:简单模式,work模式,Publish/Subscribe发布与订阅模式,Routing路由模式,Topics主题模式,RPC远程调用模式(远程调用,不太算MQ)
角色说明:
1、 超级管理员(administrator)
可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。
2、 监控者(monitoring)
可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)
3、 策略制定者(policymaker)
可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。
4、 普通管理者(management)
仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。
5、 其他
无法登陆管理控制台,通常就是普通的生产者和消费者。
设置Virtual Hosts权限:
user:用户名
configure :一个正则表达式,用户对符合该正则表达式的所有资源拥有 configure 操作的权限
write:一个正则表达式,用户对符合该正则表达式的所有资源拥有 write 操作的权限
read:一个正则表达式,用户对符合该正则表达式的所有资源拥有 read 操作的权限
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.6.0</version> </dependency>
//创建链接工厂对象 //设置RabbitMQ服务主机地址,默认localhost //设置RabbitMQ服务端口,默认5672 //设置虚拟主机名字,默认/ //设置用户连接名,默认guest //设置链接密码,默认guest //创建链接 //创建频道 //声明队列 //创建消息 //消息发送 //关闭资源
public class Producer { public static void main(String[] args) throws IOException, TimeoutException { //创建链接工厂对象 ConnectionFactory connectionFactory = new ConnectionFactory(); //设置RabbitMQ服务主机地址,默认localhost connectionFactory.setHost("192.168.211.132"); //设置RabbitMQ服务端口,默认5672 connectionFactory.setPort(5672); //设置虚拟主机名字(在rabbitmq服务器中,消息队列是放在虚拟主机中的,这是为了更好分类管理各种消息队列,一般会加/) //虚拟主机名字得先在服务器中手动添加一个,否则会找不到虚拟主机 connectionFactory.setVirtualHost("/qianyi"); //设置用户连接名,默认guest(你想用哪个rabbitmq服务器中的账户就填哪个的账号密码) connectionFactory.setUsername("guest"); //设置链接密码,默认guest connectionFactory.setPassword("guest"); //创建链接(在本身和rabbitmq服务器之间建立连接,类似跟redis、mysql之间建立连接) Connection connection = connectionFactory.newConnection(); //创建频道(在本身和指定的rabbitmq服务器中的指定虚拟主机之间建立稳定、快速的通道) Channel channel = connection.createChannel(); //声明队列(说明要在rabbitmq服务器中指定的虚拟主机中的哪条消息队列) /** * 声明队列 * 参数1:队列名称 * 参数2:是否定义持久化队列 * 参数3:是否独占本次连接(其它连接是否能连接到本条队列) * 参数4:是否在不使用的时候自动删除队列 * 参数5:队列其它参数 * **/ channel.queueDeclare("qianyi1",true,false,false,null); //创建消息 String message = "hello!qianyi!"; //消息发送 /** * 消息发送 * 参数1:交换机名称,如果没有指定则使用默认Default Exchage(不写就填空串) * 参数2:路由key,简单模式可以传递队列名称 * 参数3:消息其它属性(没有填null) * 参数4:消息内容(消息内容是字符串,需要转换成字节数组才能传输) */ channel.basicPublish("", "qianyi1", null, message.getBytes()); //关闭资源(连接和频道的) channel.close(); connection.close(); }
}
//创建链接工厂对象 //设置RabbitMQ服务主机地址,默认localhost //设置RabbitMQ服务端口,默认5672 //设置虚拟主机名字,默认/ //设置用户连接名,默认guest //设置链接密码,默认guest //创建链接 //创建频道 //创建队列 //创建消费者,并设置消息处理 //消息监听 //关闭资源(不建议关闭,建议一直监听消息)
public static void main(String[] args) throws IOException, TimeoutException { //创建链接工厂对象 ConnectionFactory connectionFactory = new ConnectionFactory(); //设置RabbitMQ服务主机地址,默认localhost connectionFactory.setHost("192.168.211.132"); //设置RabbitMQ服务端口,默认5672 connectionFactory.setPort(5672); //设置虚拟主机名字(rabbitmq服务器上创建的虚拟主机) connectionFactory.setVirtualHost("qianyi"); //设置用户连接名,默认guest(指定用哪个用户登录,guest是默认超级管理员) connectionFactory.setUsername("guest"); //设置链接密码,默认guest connectionFactory.setPassword("guest"); //创建链接(连接到rabbitmq服务器) Connection connection = connectionFactory.newConnection(); //创建频道(创建连接rabbitmq服务器之间的稳定、高效的频道,持久通讯) Channel channel = connection.createChannel(); //申明队列,指定到哪个队列获取消息 /** * 声明队列 * 参数1:队列名称 * 参数2:是否定义持久化队列 * 参数3:是否独占本次连接(其它连接是否能连接到本条队列) * 参数4:是否在不使用的时候自动删除队列 * 参数5:队列其它参数 * **/ channel.queueDeclare("qianyi1",true,false,false,null); //创建消费者,并设置消息处理(DefaultConsumer:消息消费者,参数传入创建的频道)然后再重写handleDelivery方法,可以用lambab表达式 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ /*** * @param consumerTag 消息者标签,在channel.basicConsume时候可以指定 * @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送) * @param properties 属性信息 * @param body 消息 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //路由的key String routingKey = envelope.getRoutingKey(); //获取交换机信息 String exchange = envelope.getExchange(); //获取消息ID long deliveryTag = envelope.getDeliveryTag(); //获取消息信息 String message = new String(body, "UTF-8"); //输出获得的消息内容 System.out.println("routingKey:"+routingKey+",exchange:"+exchange+",deliveryTag:"+deliveryTag+",message:"+message); } }; //消息监听 /** * 消息监听 * 要监听哪个队列?当消费者收到消息之后是否自动告诉rebbitmq服务器已经收到?收到消息之后,如何处理呢? * 参数1:队列名称 * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认 * 参数3:消息接收到后回调(传入上面创建的消费者对象,这个消费者对象中对做了对收到的消息处理) */ channel.basicConsume("qianyi1",true,defaultConsumer); //关闭资源(不建议关闭,建议一直监听消息) //channel.close(); //connection.close(); }
}
P:生产者,也就是要发送消息的程序
C:消费者:消息的接受者,会一直等待消息到来。
queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。
在rabbitMQ中消费者是一定要到某个消息队列中去获取消息的
Work Queues
与入门程序的简单模式
相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。
应用场景
public class Producer { public static void main(String[] args) throws IOException, TimeoutException { //QueueUtil是工具类,将之前的一直重复代码截取到这个工具类中 Connection connection = QueueUtil.queueUtil(); //创建频道(在本身和指定的rabbitmq服务器中的指定虚拟主机之间建立稳定、快速的通道) Channel channel = connection.createChannel(); //声明队列(说明要在rabbitmq服务器中指定的虚拟主机中的哪条消息队列) /** * 声明队列 * 参数1:队列名称 * 参数2:是否定义持久化队列 * 参数3:是否独占本次连接(其它连接是否能连接到本条队列) * 参数4:是否在不使用的时候自动删除队列 * 参数5:队列其它参数 * **/ channel.queueDeclare("qianyi2",true,false,false,null); //创建消息(因为是发给多个消费者,所以进行for循环) for (int i = 0; i <= 3; i++) { String message = "hello!qianyi!"+i; //消息发送 /** * 消息发送 * 参数1:交换机名称,如果没有指定则使用默认Default Exchage(不写就填空串) * 参数2:路由key,简单模式可以传递队列名称 * 参数3:消息其它属性(没有填null) * 参数4:消息内容(消息内容是字符串,需要转换成字节数组才能传输) */ channel.basicPublish("", "qianyi2", null, message.getBytes()); } //关闭资源(连接和频道的) channel.close(); connection.close(); } }
com.xxx.work.ConsumeOne
public class ConsumeOne { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = QueueUtil.queueUtil(); //创建频道(创建连接rabbitmq服务器之间的稳定、高效的频道,持久通讯) Channel channel = connection.createChannel(); //申明队列,指定到哪个队列获取消息 /** * 声明队列 * 参数1:队列名称 * 参数2:是否定义持久化队列 * 参数3:是否独占本次连接(其它连接是否能连接到本条队列) * 参数4:是否在不使用的时候自动删除队列 * 参数5:队列其它参数 * **/ channel.queueDeclare("qianyi2",true,false,false,null); //创建消费者,并设置消息处理(DefaultConsumer:消息消费者,参数传入创建的频道)然后再重写handleDelivery方法,可以用lambab表达式 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ /*** * @param consumerTag 消息者标签,在channel.basicConsume时候可以指定 * @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送) * @param properties 属性信息 * @param body 消息 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //路由的key String routingKey = envelope.getRoutingKey(); //获取交换机信息 String exchange = envelope.getExchange(); //获取消息ID long deliveryTag = envelope.getDeliveryTag(); //获取消息信息 String message = new String(body, "UTF-8"); //输出获得的消息内容 System.out.println("routingKey:"+routingKey+",exchange:"+exchange+",deliveryTag:"+deliveryTag+",message:"+message); } }; //消息监听 /** * 消息监听 * 要监听哪个队列?当消费者收到消息之后是否自动告诉rebbitmq服务器已经收到?收到消息之后,如何处理呢? * 参数1:队列名称 * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认 * 参数3:消息接收到后回调(传入上面创建的消费者对象,这个消费者对象中对做了对收到的消息处理) */ channel.basicConsume("qianyi2",true,defaultConsumer); //关闭资源(不建议关闭,建议一直监听消息) //channel.close(); //connection.close(); } }
P:生产者,也就是要发送消息的程序
C:消费者:消息的接受者,会一直等待消息到来。
Queue:消息队列,图中红色部分
而在订阅模型中,多了一个exchange(交换机)角色,而且过程略有变化:
P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
C:消费者,消息的接受者,会一直等待消息到来。
Queue:消息队列,接收消息、缓存消息。
Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:
Fanout:广播,将消息交给所有绑定到交换机的队列
Direct:定向,把消息交给符合指定routing key 的队列
Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
1.每个消费者监听自己的队列。
2.生产者将消息发给broker(代理人),由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息
(1)生产者
生产者需要注意如下3点:
1.声明交换机
2.声明队列
3.队列需要绑定指定的交换机
生产者:申明一个交换机,然后绑定这个交换机所有的(根据需求)队列,发送消息即可
public class Producer { public static void main(String[] args) throws IOException, TimeoutException { //QueueUtil是工具类,将之前的一直重复代码截取到这个工具类中 Connection connection = QueueUtil.queueUtil(); //创建频道(在本身和指定的rabbitmq服务器中的指定虚拟主机之间建立稳定、快速的通道) Channel channel = connection.createChannel(); /** * 声明交换机 * 参数1:交换机名称 * 参数2:交换机类型,fanout、topic、direct、headers(以下用fanout类型,广播模式,每个与交换机绑定的队列都会接收到信息) */ channel.exchangeDeclare("QY", BuiltinExchangeType.FANOUT); //声明队列(说明要在rabbitmq服务器中指定的虚拟主机中的哪条消息队列) /** * 声明队列 * 参数1:队列名称 * 参数2:是否定义持久化队列 * 参数3:是否独占本次连接(其它连接是否能连接到本条队列) * 参数4:是否在不使用的时候自动删除队列 * 参数5:队列其它参数 * **/ channel.queueDeclare("qianyi3",true,false,false,null); channel.queueDeclare("qianyi4",true,false,false,null); //队列绑定交换机 //参数1:需要绑定的队列 //参数2:需要绑定的交换机 channel.queueBind("qianyi3","QY",""); channel.queueBind("qianyi4","QY",""); //创建消息(中文会乱码) String message = "发布订阅模式:欢迎光临红浪漫!"; //消息发送 /** * 消息发送 * 参数1:交换机名称,如果没有指定则使用默认Default Exchage(不写就填空串) * 参数2:路由key,简单模式可以传递队列名称,发布订阅模式不传递队列名称 * 参数3:消息其它属性(没有填null) * 参数4:消息内容(消息内容是字符串,需要转换成字节数组才能传输) */ channel.basicPublish("QY", "", null, message.getBytes()); //关闭资源(连接和频道的) channel.close(); connection.close(); } }
消费者:消费者可以是多个,只要监听的队列跟交换机绑定了,那么生产者发送的内容这个消费者都能收到
public class ConsumeOne { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = QueueUtil.queueUtil(); //创建频道(创建连接rabbitmq服务器之间的稳定、高效的频道,持久通讯) Channel channel = connection.createChannel(); //申明队列,指定到哪个队列获取消息 /** * 声明队列 * 参数1:队列名称 * 参数2:是否定义持久化队列 * 参数3:是否独占本次连接(其它连接是否能连接到本条队列) * 参数4:是否在不使用的时候自动删除队列 * 参数5:队列其它参数 * **/ channel.queueDeclare("qianyi3",true,false,false,null); //创建消费者,并设置消息处理(DefaultConsumer:消息消费者,参数传入创建的频道)然后再重写handleDelivery方法,可以用lambab表达式 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ /*** * @param consumerTag 消息者标签,在channel.basicConsume时候可以指定 * @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送) * @param properties 属性信息 * @param body 消息 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //路由的key String routingKey = envelope.getRoutingKey(); //获取交换机信息 String exchange = envelope.getExchange(); //获取消息ID long deliveryTag = envelope.getDeliveryTag(); //获取消息信息 String message = new String(body, "UTF-8"); //输出获得的消息内容 System.out.println("routingKey:"+routingKey+",exchange:"+exchange+",deliveryTag:"+deliveryTag+",message:"+message); } }; //消息监听 /** * 消息监听 * 要监听哪个队列?当消费者收到消息之后是否自动告诉rebbitmq服务器已经收到?收到消息之后,如何处理呢? * 参数1:队列名称 * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认 * 参数3:消息接收到后回调(传入上面创建的消费者对象,这个消费者对象中对做了对收到的消息处理) */ channel.basicConsume("qianyi3",true,defaultConsumer); //关闭资源(不建议关闭,建议一直监听消息) //channel.close(); //connection.close(); } }
总结:生产者发送的消息先发给申明的交换机,交换机又绑定了一个或者多个队列,那么在这个模式下,消费者只需要监视跟交换机绑定的队列,就可以获取到生产者发送的消息。
如果两个或者多个消费者监视同一个队列,那么又会出现这种情况:即生产者发送一条消息,能同时接收到信息的只有一个消费者,无法做到上面的效果,多个消费者同时收到消息
所以如果需要同时接收消息的话,必须一个消费者监听一条队列,而该队列必须跟交换机有绑定的关系
发布订阅模式与work队列模式的区别:
1、work队列模式不用定义交换机,而发布/订阅模式需要定义交换机。
2、发布/订阅模式的生产方是面向交换机发送消息,work队列模式的生产方是面向队列发送消息(底层使用默认交换机)。
3、发布/订阅模式需要设置队列和交换机的绑定,work队列模式不需要设置,实际上work队列模式会将队列绑 定到默认的交换机
路由模式特点:
1.队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
2.消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey。
3.Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息
P:生产者,向Exchange发送消息,交换机绑定队列的时候,会指定一个routing key,给交换机发送消息的时候,也要带着指定的routing key,并且有几个routing key就发送几次(一次只能指定一个routing key)
X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列
C1:消费者,其所在队列指定了需要routing key 为 error 的消息
C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息
创建消息生产者,代码如下:
public class Producer { public static void main(String[] args) throws IOException, TimeoutException { //QueueUtil是工具类,将之前的一直重复代码截取到这个工具类中 Connection connection = QueueUtil.queueUtil(); //创建频道(在本身和指定的rabbitmq服务器中的指定虚拟主机之间建立稳定、快速的通道) Channel channel = connection.createChannel(); /** * 声明交换机 * 参数1:交换机名称 * 参数2:交换机类型,fanout、topic、direct、headers(以下用DIRECT类型,路由模式,交换机发送的消息会根routing key发送给匹配的队列) */ channel.exchangeDeclare("QY1", BuiltinExchangeType.DIRECT); //声明队列(说明要在rabbitmq服务器中指定的虚拟主机中的哪条消息队列) /** * 声明队列 * 参数1:队列名称 * 参数2:是否定义持久化队列 * 参数3:是否独占本次连接(其它连接是否能连接到本条队列) * 参数4:是否在不使用的时候自动删除队列 * 参数5:队列其它参数 * **/ channel.queueDeclare("qianyi5",true,false,false,null); channel.queueDeclare("qianyi6",true,false,false,null); //队列绑定交换机 //参数1:需要绑定的队列 //参数2:需要绑定的交换机 //参数3:需要绑定的routing key(路由key)在交换机给队列发送消息的时候,会根据它发送 channel.queueBind("qianyi5","QY1","rouingkey1"); channel.queueBind("qianyi6","QY1","rouingkey2"); //创建消息(中文会乱码) String message1 = "发布订阅模式:欢迎光临红浪漫!111"; String message2 = "发布订阅模式:欢迎光临红浪漫!222"; //消息发送 /** * 消息发送 * 参数1:交换机名称,如果没有指定则使用默认Default Exchage(不写就填空串) * 参数2:路由key,简单模式可以传递队列名称,广播模式不传递队列名称 * 参数3:消息其它属性(没有填null) * 参数4:消息内容(消息内容是字符串,需要转换成字节数组才能传输) */ channel.basicPublish("QY1", "rouingkey1", null, message1.getBytes()); channel.basicPublish("QY1", "rouingkey2", null, message2.getBytes()); //关闭资源(连接和频道的) channel.close(); connection.close(); } }
消费者:
public class ConsumeOne { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = QueueUtil.queueUtil(); //创建频道(创建连接rabbitmq服务器之间的稳定、高效的频道,持久通讯) Channel channel = connection.createChannel(); //申明队列,指定到哪个队列获取消息 /** * 声明队列 * 参数1:队列名称 * 参数2:是否定义持久化队列 * 参数3:是否独占本次连接(其它连接是否能连接到本条队列) * 参数4:是否在不使用的时候自动删除队列 * 参数5:队列其它参数 * **/ channel.queueDeclare("qianyi6",true,false,false,null); //创建消费者,并设置消息处理(DefaultConsumer:消息消费者,参数传入创建的频道)然后再重写handleDelivery方法,可以用lambab表达式 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ /*** * @param consumerTag 消息者标签,在channel.basicConsume时候可以指定 * @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送) * @param properties 属性信息 * @param body 消息 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //路由的key String routingKey = envelope.getRoutingKey(); //获取交换机信息 String exchange = envelope.getExchange(); //获取消息ID long deliveryTag = envelope.getDeliveryTag(); //获取消息信息 String message = new String(body, "UTF-8"); //输出获得的消息内容 System.out.println("routingKey:"+routingKey+",exchange:"+exchange+",deliveryTag:"+deliveryTag+",message:"+message); } }; //消息监听 /** * 消息监听 * 要监听哪个队列?当消费者收到消息之后是否自动告诉rebbitmq服务器已经收到?收到消息之后,如何处理呢? * 参数1:队列名称 * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认 * 参数3:消息接收到后回调(传入上面创建的消费者对象,这个消费者对象中对做了对收到的消息处理) */ channel.basicConsume("qianyi6",true,defaultConsumer); //关闭资源(不建议关闭,建议一直监听消息) //channel.close(); //connection.close(); } }
总结:当在生产者中绑定交换机的时候,每绑定一个队列,都会给该队列指定一个routing key,然后在生产者向交换机发送消息的时候,指定某个已经绑定的routing key,就会将该条消息发送到对应的队列,比如A队列绑定了routingkey1,生产者发送消息给队列发送消息的时候,就会去队列中找routingkey1的队列,发送过去,消费者只需要根据监听的队列就可以获得该条消息。
可以做到这样:申明两个队列,分别指定routingkey,发送消息的时候也发送两条,一条消息指定其中一个routingkey,也就将那条消息发送给了一个队列,另一条消息指定另一个routingkey,消息也就发送到了另一个队列,两个或者多个消费者只需要绑定不同的队列就可以获得两个不同的消息。
Routingkey
一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: qianyi.insert
通配符规则:
#
:匹配一个或多个词
*
:匹配不多不少恰好1个词
图解:
红色Queue:绑定的是usa.#
,因此凡是以 usa.
开头的routing key
都会被匹配到
黄色Queue:绑定的是#.news
,因此凡是以 .news
结尾的 routing key
都会被匹配
使用topic类型的Exchange,发送消息的routing key有3种: item.insert
、item.update
、item.delete
:
创建TopicProducer实现消息生产,代码如下:
public class Producer { public static void main(String[] args) throws IOException, TimeoutException { //QueueUtil是工具类,将之前的一直重复代码截取到这个工具类中 Connection connection = QueueUtil.queueUtil(); //创建频道(在本身和指定的rabbitmq服务器中的指定虚拟主机之间建立稳定、快速的通道) Channel channel = connection.createChannel(); /** * 声明交换机 * 参数1:交换机名称 * 参数2:交换机类型,fanout、topic、direct、headers(以下用DIRECT类型,路由模式,交换机发送的消息会根routing key发送给匹配的队列) */ channel.exchangeDeclare("QY2", BuiltinExchangeType.TOPIC); //声明队列(说明要在rabbitmq服务器中指定的虚拟主机中的哪条消息队列) /** * 声明队列 * 参数1:队列名称 * 参数2:是否定义持久化队列 * 参数3:是否独占本次连接(其它连接是否能连接到本条队列) * 参数4:是否在不使用的时候自动删除队列 * 参数5:队列其它参数 * **/ channel.queueDeclare("qianyi7",true,false,false,null); channel.queueDeclare("qianyi8",true,false,false,null); channel.queueDeclare("qianyi9",true,false,false,null); //队列绑定交换机 //参数1:需要绑定的队列 //参数2:需要绑定的交换机 //参数3:需要绑定的routing key(路由key)在交换机给队列发送消息的时候,会根据它发送 // (*表示后面一个单词无论是什么,只要交换机发送消息的routingkey的值是item开头的,它都能接收到) //下面操作给qianyi7队列两个routing key,只要发送的消息指定其中任何一个routingkey,qianyi7都会接收到消息 //给qianyi8队列用了通配符,只要发送的消息指定的routing key是以item开头的,它都能收到 channel.queueBind("qianyi7","QY2","item.inset"); channel.queueBind("qianyi7","QY2","item.update"); channel.queueBind("qianyi8","QY2","item.*"); //创建消息(中文会乱码) String message1 = "发布订阅模式:欢迎光临红浪漫!111"; String message2 = "发布订阅模式:欢迎光临红浪漫!222"; String message3 = "发布订阅模式:欢迎光临红浪漫!333"; //消息发送 /** * 消息发送 * 参数1:交换机名称,如果没有指定则使用默认Default Exchage(不写就填空串) * 参数2:路由key,简单模式可以传递队列名称,广播模式不传递队列名称 * 参数3:消息其它属性(没有填null) * 参数4:消息内容(消息内容是字符串,需要转换成字节数组才能传输) */ channel.basicPublish("QY2", "item.inset", null, message1.getBytes()); channel.basicPublish("QY2", "item.update", null, message2.getBytes()); //这里routing key值写item.aaa为了验证只要是以item开头的队列,都可以接收到这条消息 channel.basicPublish("QY2", "item.aaa", null, message3.getBytes()); //关闭资源(连接和频道的) channel.close(); connection.close(); } }
消费者1:
public class ConsumeOne { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = QueueUtil.queueUtil(); //创建频道(创建连接rabbitmq服务器之间的稳定、高效的频道,持久通讯) Channel channel = connection.createChannel(); //申明队列,指定到哪个队列获取消息 /** * 声明队列 * 参数1:队列名称 * 参数2:是否定义持久化队列 * 参数3:是否独占本次连接(其它连接是否能连接到本条队列) * 参数4:是否在不使用的时候自动删除队列 * 参数5:队列其它参数 * **/ channel.queueDeclare("qianyi7",true,false,false,null); //创建消费者,并设置消息处理(DefaultConsumer:消息消费者,参数传入创建的频道)然后再重写handleDelivery方法,可以用lambab表达式 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ /*** * @param consumerTag 消息者标签,在channel.basicConsume时候可以指定 * @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送) * @param properties 属性信息 * @param body 消息 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //路由的key String routingKey = envelope.getRoutingKey(); //获取交换机信息 String exchange = envelope.getExchange(); //获取消息ID long deliveryTag = envelope.getDeliveryTag(); //获取消息信息 String message = new String(body, "UTF-8"); //输出获得的消息内容 System.out.println("routingKey:"+routingKey+",exchange:"+exchange+",deliveryTag:"+deliveryTag+",message:"+message); } }; //消息监听 /** * 消息监听 * 要监听哪个队列?当消费者收到消息之后是否自动告诉rebbitmq服务器已经收到?收到消息之后,如何处理呢? * 参数1:队列名称 * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认 * 参数3:消息接收到后回调(传入上面创建的消费者对象,这个消费者对象中对做了对收到的消息处理) */ channel.basicConsume("qianyi7",true,defaultConsumer); //关闭资源(不建议关闭,建议一直监听消息) //channel.close(); //connection.close(); } }
消费者2:
public class ConsumeTwo { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = QueueUtil.queueUtil(); //创建频道(创建连接rabbitmq服务器之间的稳定、高效的频道,持久通讯) Channel channel = connection.createChannel(); //申明队列,指定到哪个队列获取消息 /** * 声明队列 * 参数1:队列名称 * 参数2:是否定义持久化队列 * 参数3:是否独占本次连接(其它连接是否能连接到本条队列) * 参数4:是否在不使用的时候自动删除队列 * 参数5:队列其它参数 * **/ channel.queueDeclare("qianyi8",true,false,false,null); //创建消费者,并设置消息处理(DefaultConsumer:消息消费者,参数传入创建的频道)然后再重写handleDelivery方法,可以用lambab表达式 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ /*** * @param consumerTag 消息者标签,在channel.basicConsume时候可以指定 * @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送) * @param properties 属性信息 * @param body 消息 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //路由的key String routingKey = envelope.getRoutingKey(); //获取交换机信息 String exchange = envelope.getExchange(); //获取消息ID long deliveryTag = envelope.getDeliveryTag(); //获取消息信息 String message = new String(body, "UTF-8"); //输出获得的消息内容 System.out.println("routingKey:"+routingKey+",exchange:"+exchange+",deliveryTag:"+deliveryTag+",message:"+message); } }; //消息监听 /** * 消息监听 * 要监听哪个队列?当消费者收到消息之后是否自动告诉rebbitmq服务器已经收到?收到消息之后,如何处理呢? * 参数1:队列名称 * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认 * 参数3:消息接收到后回调(传入上面创建的消费者对象,这个消费者对象中对做了对收到的消息处理) */ channel.basicConsume("qianyi8",true,defaultConsumer); //关闭资源(不建议关闭,建议一直监听消息) //channel.close(); //connection.close(); } }
总结:以上代码验证了:1、可以给一个队列指定多个routing key,只要消息发送给多个routing key中的任何一个,该队列都会收到消息。
2、可以给routing key用通配符(*或者#)使用item.*,那么只要发送消息的时候,指定的routing key是以item开头都可以被该队列收到。
所以,以上代码,qianyi7队列收到了两条消息,一条是item.inset路由key接收的,一条是item.update路由key接收的,而qiani8则收到了3条信息,因为每条信息的路由key都符合item.*的规则。
RabbitMQ工作模式: 1、简单模式 HelloWorld 一个生产者、一个消费者,不需要设置交换机(使用默认的交换机)
2、工作队列模式 Work Queue 一个生产者、多个消费者(竞争关系),不需要设置交换机(使用默认的交换机)
3、发布订阅模式 Publish/subscribe 需要设置类型为fanout的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列
5、通配符模式 Topic 需要设置类型为topic的交换机,交换机和队列进行绑定,并且指定通配符方式的routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列
在Spring项目中,可以使用Spring-Rabbit去操作RabbitMQ https://github.com/spring-projects/spring-amqp
尤其是在spring boot项目中只需要引入对应的amqp启动器依赖即可,方便的使用RabbitTemplate发送消息,使用注解接收消息。
一般在开发过程中:
生产者工程:
application.yml文件配置RabbitMQ相关信息;
在生产者工程中编写配置类,用于创建交换机和队列,并进行绑定
注入RabbitTemplate对象,通过RabbitTemplate对象发送消息到交换机
消费者工程:
application.yml文件配置RabbitMQ相关信息
创建消息处理类,用于接收队列中的消息并进行处理
创建生产者工程,添加依赖:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <!--父工程--> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.4.RELEASE</version> </parent> <groupId>com.xxx</groupId> <artifactId>springboot-rabbitmq-producer</artifactId> <version>1.0-SNAPSHOT</version> <!--依赖--> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency> </dependencies> </project>
(1)application.yml配置文件
创建application.yml,内容如下:
spring: rabbitmq: host: localhost port: 5672 virtual-host: /aaa #虚拟主机名 username: guest password: guest
绑定交换机和队列
创建RabbitMQ队列与交换机绑定的配置类RabbitMQConfig,代码如下:
@Configuration public class RabbitMQConfig { /*** * 声明交换机 */ @Bean(name = "itemTopicExchange") public Exchange topicExchange(){ return ExchangeBuilder.topicExchange("item_topic_exchange").durable(true).build(); } /*** * 声明队列 */ @Bean(name = "itemQueue") public Queue itemQueue(){ return QueueBuilder.durable("item_queue").build(); } /*** * 队列绑定到交换机上 */ @Bean public Binding itemQueueExchange(@Qualifier("itemQueue")Queue queue, @Qualifier("itemTopicExchange")Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("item.#").noargs(); } }
创建消费者工程,添加依赖:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <!--父工程--> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.4.RELEASE</version> </parent> <groupId>com.xxx</groupId> <artifactId>springboot-rabbitmq-consumer</artifactId> <version>1.0-SNAPSHOT</version> <!--依赖--> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> </dependencies> </project>
启动类:略
配置文件与上相同,略
编写消息监听器com.itheima.listener.MessageListener,代码如下:
@Component public class MessageListener { /** * 监听某个队列的消息 * @param message 接收到的消息 */ @RabbitListener(queues = "item_queue") public void myListener1(String message){ System.out.println("消费者接收到的消息为:" + message); } }
+ confirm模式
生产者发送消息到交换机的时机
+ return模式
交换机转发消息给queue的时机
1.生产者发送消息到交换机
2.交换机根据routingkey 转发消息给队列
3.消费者监控队列,获取队列中信息
4.消费成功删除队列中的消息
实现:
先创建工程,添加依赖:(web依赖用于测试,test依赖无所谓,以及amqp的依赖,amqp是一种协议,里面集成了rabbitMQ的相关需要依赖)
<parent> <artifactId>spring-boot-starter-parent</artifactId> <groupId>org.springframework.boot</groupId> <version>2.1.3.RELEASE</version> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> </dependencies>
2、创建启动类,在启动类里面创建队列、交换机、绑定的对象,并指定各自的名字(对象名随意,可指定可不指定,下面代码指定了,在bean注解后面)
@SpringBootApplication public class RabbitMQDemo01Application { public static void main(String[] args) { SpringApplication.run(RabbitMQDemo01Application.class,args); } //创建队列 @Bean(name = "queue_demo01") public Queue createQueue(){ //Q_demo01指定的是队列名 return new Queue("Q_demo01"); } //创建交换机,直接使用directExchange(路由模式)它的父类也实现了Exchange接口 @Bean(name = "exchange_demo01") public DirectExchange createDirectExchange(){ //E_demo01指定的是交换机名 return new DirectExchange("E_demo01"); } //创建绑定对象,将交换机和队列绑定 @Bean public Binding createBinDing(){ //将上面创建的队列和交换机进行绑定,然后设置这个队列接收哪些匹配的路由key:demo01 return BindingBuilder.bind(createQueue()).to(createDirectExchange()).with("demo01.01"); } }
3、创建配置文件,主要配置该微服务的端口号、rabbitMQ服务器的IP地址和端口号,以及rabbitMQ的用户名和密码、是否启动confirm模式
spring: rabbitmq: host: 192.168.211.132 port: 5672 username: guest password: guest publisher-confirms: true #配置confirms模式,默认关闭 server: port: 8881
4、创建controller模拟接收到前端信息后,给rabbitMQ服务器发送消息,这个消息最终需要另一个微服务(消费者)接收,因为有confirm模式,所以在发消息之前需要先设置回调函数,当rabbitMQ中的交换机收到消息就会调用的函数(方法)
@RestController @RequestMapping("/demo01") public class Demo01Controller { @Autowired private RabbitTemplate rabbitTemplate; @Autowired private ConfirmCallback confirmCallback; @RequestMapping("/test01") public String demo01(){ System.out.println("接收请求"); System.out.println("处理请求中……"); //设置回调函数(当发送消息后,接收方会返回发送发调用方法,这个方法调用就能知道消息发送结果) rabbitTemplate.setConfirmCallback(confirmCallback); //发送消息 rabbitTemplate.convertAndSend("E_demo01","demo01.01","demo01……"); return "ok"; } }
5、编写这个回调函数的具体方法,这个回调函数实现rabbitTemplate.confarmCallback接口,重写方法即可,具体实现看注释(我这里因为没有下载源码,所以自动生成的代码,参数就变成了b、s这些不好读,下载源码即可)
//需要交给spring核心容器管理 //回调函数要实现rabbitTemplate中的confirmCallback接口 @Component public class ConfirmCallback implements RabbitTemplate.ConfirmCallback { @Override public void confirm(CorrelationData correlationData, boolean b, String s) { /** * * @param correlationData 消息信息 * @param b 确认标识:true,MQ服务器exchange表示已经确认收到消息 false 表示没有收到消息 * @param s 如果没有收到消息,则指定为MQ服务器exchange消息没有收到的原因,如果已经收到则指定为null */ if (b){ System.out.println("消息收到,内容为:"+correlationData); }else { System.out.println("消息未收到,原因为:"+s); } } }
然后就可以发送请求到编写的controller类了,controller类接收到请求后会发消息到交换机上,交换机如果接收到消息,就会调用回调函数,如果发送消息时,故意填写一个错误的交换机,并且这个错误的交换机是不存在的话,那么当消息一发送,没有找到对应的交换机,在调用回调函数的时候就会进入没有接收消息的判断中,这样就可以确定消息到底有没有发送成功。
如上,已经实现了消息发送到交换机上的内容,但是如果是,交换机发送成功,但是在路由转发到队列的时候,发送错误,此时就需要用到returncallback模式了。接下来我们实现下。
实现步骤如下:
1.开启returncallback模式
2.设置回调函数
3.发送消息
配置yml开启returncallback:在配置文件中开启:
spring: rabbitmq: host: 192.168.211.132 port: 5672 username: guest password: guest publisher-confirms: true #配置confirms模式,默认关闭 publisher-returns: true #配置returns模式,默认关闭 server: port: 8881
编写returns回调函数:这个回调函数跟confirm的回调函数一脉相承,都差不多
@Component public class ReturnsCallback implements RabbitTemplate.ReturnCallback { @Override public void returnedMessage(Message message, int i, String s, String s1, String s2) { /** * * @param message 消息信息,因为message传递过来是字节,所以需要转换成字符串 * @param i 退回的状态码 * @param s 退回的信息 * @param s1 交换机 * @param s2 路由key */ System.out.println("退回的消息是:"+new String(message.getBody())); System.out.println("退回的状态码是:"+i); System.out.println("退回的信息是:"+s); System.out.println("退回的交换机是:"+s1); System.out.println("退回的路由key是:"+s2); } }
还需要在controller里面加一行代码,就是发送消息之前设置returns的回调函数:
@RestController @RequestMapping("/demo01") public class Demo01Controller { @Autowired private RabbitTemplate rabbitTemplate; //需要注入刚刚创建的confirm回调函数 @Autowired private ConfirmCallback confirmCallback; //需要注入刚刚创建的returns回调函数 @Autowired private ReturnsCallback returnsCallback; @RequestMapping("/test01") public String demo01(){ System.out.println("接收请求"); System.out.println("处理请求中……"); //设置confirm回调函数(当发送消息后,接收方会返回发送发调用方法,这个方法调用就能知道消息发送结果) rabbitTemplate.setConfirmCallback(confirmCallback); //设置returns的回调函数(当交换机收到消息发送给队列后,,队列就会调用这个回调函) rabbitTemplate.setReturnCallback(returnsCallback); //发送消息 rabbitTemplate.convertAndSend("E_demo01","demo01.01","demo01……"); return "ok"; } }
总结起来也就是新增加了三个步骤,第一个在配置文件中开启returnsCallback,第二个写一个returnsCallback的回调函数,第三个在发送消息之前指定好回调函数,这样就完成了从生产者发消息到交换机,交换机会调用回调函数,从交换机发消息到队列,队列会调用回调函数,确保了这三个点发送消息不会有问题,唯一的小区别就是,交换机接收到消息,判定是否接收到的条件可以是错误内容,如果错误内容为null则说明交换机接收到信息且没有异常,队列是否接收到消息的判断条件可以是状态码,如下图,当发送消息时指定不存在的routting key(路由key),那么打印的消息就会如下:
+ returncallback模式,需要手动设置开启
+ 该模式 指定 在路由的时候发送错误的时候调用回调函数,不影响消息发送到交换机
但是一般情况下我们使用confirm即可,因为路由key 由开发人员指定,一般不会出现错误,并且从交换机到队列,都是在rabbitMQ服务器中进行的,除非服务器挂掉,否则不会出问题。如果要保证消息在交换机和routingkey的时候那么需要结合两者的方式来进行设置。
上边我们学习了发送方的可靠性投递,但是在消费方也有可能出现问题,比如没有接受消息,比如接受到消息之后,在代码执行过程中出现了异常,这种情况下我们需要额外的处理,那么就需要手动进行确认签收消息。rabbtimq给我们提供了一个机制:ACK机制。
ACK机制:有三种方式
手动确认 acknowledge="manual"
根据异常情况来确认 acknowledge="auto"
其中自动确认是指:
当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。
其中手动确认方式是指:
则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()等方法,让其按照业务功能进行处理,比如:重新发送,比如拒绝签收进入死信队列等等。
spring: rabbitmq: host: 192.168.211.132 port: 5672 username: guest password: guest publisher-confirms: true #配置confirms模式,默认关闭 publisher-returns: true #配置returns模式,默认关闭 listener: simple: acknowledge-mode: manual #设置监听端消息ACK确认模式为手动模式,默认自动确认接收消息,无论是否出异常 server: port: 8881
2.创建消息监听器监听消息:监听队列,接收消息,然后用cry/catch来判定是否接收消息,如果没有异常,则接收消息,并打印,如果有异常,则可以选择将消息返回给队列或者丢弃消息
@Component //指定需要监听的队列 @RabbitListener(queues = "Q_demo01") public class ListenerRabbitMQ { /** * channel 频道对象 他提供了ack/nack方法(签收和拒绝签收) * Message 消息本生的封装的对象 * String msg 消息的本身() */ @RabbitHandler public void msg(Message message, Channel channel, String msg){ //接收消息 System.out.println("消费者接收到的消息:"+msg); try { //处理本地业务 System.out.println("处理本地业务开始======start======"); Thread.sleep(2000); //模拟接收消息出错 //int i = 1 / 0; //签收消息 // 参数1 指定的是消息的序号(快递号) // 参数2 指定是否需要批量的签收 如果是true,那就批量 如果是false 那就不批量 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } catch (Exception e) { e.printStackTrace(); try { //如果出现异常,则拒绝消息 可以重回队列 也可以丢弃 可以根据业务场景来 //方式一:可以批量处理y用:basicNack,传三个参数 //参数3 标识是否重回队列 true 是重回 false 就是不重回:丢弃消息,如果重回队列的话,异常没有解决,就会进入死循环 //channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true); //方式二:不批量处理:basicReject,传两个参数,第二个参数是否批量 channel.basicReject(message.getMessageProperties().getDeliveryTag(),false); } catch (IOException ioException) { ioException.printStackTrace(); } } } }
按以上代码,如果消费端没有出现异常,则会正常接收消息,如果出现了异常,说明这个消费端的实施业务逻辑失败,则必须告诉交换机,任务失败,交易取消,可以选择将消息返回给交换机,或者丢弃这个消息,返回给交换机,那么消息还会存放在交换机,但是交换机又会重新将返回的消息发送给消费端,消费的又出现异常,再返回给交换机,形成死循环。
以下为各种情况代码结果演示:
1、配置文件开启ACK手动确认模式,但是在消费端没有写代码确认接收,也没有拒绝接收,消费端代码如下:
@Component @RabbitListener(queues = "queue_demo01") public class MyRabbitListener { /*@RabbitHandler public void msg(String message) { System.out.println("消费Duang接收消息:" + message); }*/ @RabbitHandler public void msg(Message message, Channel channel ,String msg) { System.out.println("消费Duang接收消息:" + msg); } }
那么执行的结果就会是这样:
说明一直没有被签收,消息一直会在rabbitMQ服务器
2、配置文件开启ACK手动确认模式,消费端出现异常,消息接收被拒绝后执行丢弃消息操作,消费端代码如下:
@Component //指定需要监听的队列 @RabbitListener(queues = "Q_demo01") public class ListenerRabbitMQ { /** * channel 频道对象 他提供了ack/nack方法(签收和拒绝签收) * Message 消息本生的封装的对象 * String msg 消息的本身() */ @RabbitHandler public void msg(Message message, Channel channel, String msg){ //接收消息 System.out.println("消费者接收到的消息:"+msg); try { //处理本地业务 System.out.println("处理本地业务开始======start======"); Thread.sleep(2000); //模拟接收消息出错 int i = 1 / 0; //签收消息 // 参数1 指定的是消息的序号(快递号) // 参数2 指定是否需要批量的签收 如果是true,那就批量 如果是false 那就不批量 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } catch (Exception e) { e.printStackTrace(); try { //如果出现异常,则拒绝消息 可以重回队列 也可以丢弃 可以根据业务场景来 //方式一:可以批量处理y用:basicNack,传三个参数 //参数3 标识是否重回队列 true 是重回 false 就是不重回:丢弃消息,如果重回队列的话,异常没有解决,就会进入死循环 channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false); //方式二:不批量处理:basicReject,传两个参数,第二个参数是否批量 //channel.basicReject(message.getMessageProperties().getDeliveryTag(),false); } catch (IOException ioException) { ioException.printStackTrace(); } } } }
消息丢弃后,则不再出现:
3、配置文件开启ACK手动确认模式,消费端出现异常,拒绝接收消息,然后将消息返回给队列,代码如下(第三个参数设置为重回队列进行再次投递):
@Component //指定需要监听的队列 @RabbitListener(queues = "Q_demo01") public class ListenerRabbitMQ { /** * channel 频道对象 他提供了ack/nack方法(签收和拒绝签收) * Message 消息本生的封装的对象 * String msg 消息的本身() */ @RabbitHandler public void msg(Message message, Channel channel, String msg){ //接收消息 System.out.println("消费者接收到的消息:"+msg); try { //处理本地业务 System.out.println("处理本地业务开始======start======"); Thread.sleep(2000); //模拟接收消息出错 int i = 1 / 0; //签收消息 // 参数1 指定的是消息的序号(快递号) // 参数2 指定是否需要批量的签收 如果是true,那就批量 如果是false 那就不批量 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } catch (Exception e) { e.printStackTrace(); try { //如果出现异常,则拒绝消息 可以重回队列 也可以丢弃 可以根据业务场景来 //方式一:可以批量处理y用:basicNack,传三个参数 //参数3 标识是否重回队列 true 是重回 false 就是不重回:丢弃消息,如果重回队列的话,异常没有解决,就会进入死循环 channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true); //方式二:不批量处理:basicReject,传两个参数,第二个参数是否批量 //channel.basicReject(message.getMessageProperties().getDeliveryTag(),false); } catch (IOException ioException) { ioException.printStackTrace(); } } } }
消息返回队列后,会再次给消费端投递该消息,异常不消失,死循环不停。
总结:
第一种:签收
channel.basicAck()
第二种:拒绝签收 批量处理
channel.basicNack()
第三种:拒绝签收 不批量处理
channel.basicReject()
如果在消费端没有出现异常,则调用channel.basicAck(deliveryTag,false);方法确认签收消息
如何保证消息的高可靠性传输?
1.持久化(如果使用spring boot,则持久化的默认设置就是true,不需要额外进行设置)
? exchange要持久化
? queue要持久化
? message要持久化
2.生产方确认Confirm、Return
3.消费方确认Ack
如果并发量大的情况下,生产方不停的发送消息,可能处理不了那么多消息,此时消息在队列中堆积很多,当消费端启动,瞬间就会涌入很多消息,消费端有可能瞬间垮掉,这时我们可以在消费端进行限流操作,每秒钟放行多少个消息。这样就可以进行并发量的控制,减轻系统的负载,提供系统的可用性,这种效果往往可以在秒杀和抢购中进行使用。在rabbitmq中也有限流的一些配置。
spring: rabbitmq: host: 192.168.211.132 port: 5672 username: guest password: guest publisher-confirms: true #配置confirms模式,默认关闭 publisher-returns: true #配置returns模式,默认关闭 listener: simple: acknowledge-mode: manual #设置监听端消息ACK确认模式为手动模式 prefetch: 1 #设置每一个消费端,可以同时处理的未确认的消息最大数量 server: port: 8881
这个限流默认是250个。
RabbitMQ设置过期时间有两种:
针对某一个队列设置过期时间 ;队列中的所有消息在过期时间到之后,如果没有被消费则被全部清除
针对某一个特定的消息设置过期时间;队列中的消息设置过期时间之后,如果这个消息没有被消息则被清除。
需要注意一点的是:
针对某一个特定的消息设置过期时间时,一定是消息在队列中在队头的时候进行计算,如果某一个消息A 设置过期时间5秒,消息B在队头,消息B没有设置过期时间,B此时过了已经5秒钟了还没被消费。注意,此时A消息并不会被删除,因为它并没有再队头。
一般在工作当中,单独使用TTL的情况较少。后面会讲到延时队列。在这里有用处。
设置过期队列,只需要在创建队列的时候指定一下就可以了:
@SpringBootApplication public class RabbitMQDemo01Application { public static void main(String[] args) { SpringApplication.run(RabbitMQDemo01Application.class,args); } /*创建定时过期队列,使用构建者模式,durable("Q_demo01"):设置队列名 withArgument("x-message",100)第一个参数后面讲,第二个是过期时间,单位毫秒*/ @Bean(name = "queue_demo01") public Queue createQueue(){ //Q_demo01指定的是队列名 return QueueBuilder.durable("Q_demo01").withArgument("x-message",100).build(); } //创建交换机,直接使用directExchange(路由模式)它的父类也实现了Exchange接口 @Bean(name = "exchange_demo01") public DirectExchange createDirectExchange(){ //E_demo01指定的是交换机名 return new DirectExchange("E_demo01"); } //创建绑定对象,将交换机和队列绑定 @Bean public Binding createBinDing(){ //将上面创建的队列和交换机进行绑定,然后设置这个队列接收哪些匹配的路由key:demo01 return BindingBuilder.bind(createQueue()).to(createDirectExchange()).with("demo01.01"); } }
如下图的过程:
成为死信的三种条件:
队列中消息的长度(数量)到达限制;
消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;(丢弃)
原队列存在消息过期设置,消息到达超时时间未被消费;(ddl设置的过期的时间到了)
DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。
当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列。
可以监听这个队列中的消息做相应的处理。(例如客户下订单,进入支付页面,这个时候商品库存已经在数据库中进行减数操作,如果客户突然不执行支付操作,那么就可以设置定时消息,如果超过时间,客户没有进行支付,则将这个死信消息放入死信交换机,发送给与私信交换机绑定的队列中,用另一个消费端接收这个死信消息,这个消费端就是执行将库存数量重新加回来的操作)
刚才说到死信队列也是一个正常的exchange.只需要设置一些参数即可。
给队列设置参数: x-dead-letter-exchange 和 x-dead-letter-routing-key。
如上图所示
1.创建queue1 正常队列 用于接收死信队列过期之后转发过来的消息
2.创建queue2 可以针对他进行参数设置 死信队列
3.创建交换机 死信交换机
4.绑定正常队列到交换机
rabbitMQ五种模式使用方式以及springboot整合rabbitMQ的使用
原文:https://www.cnblogs.com/qianyi525/p/13916410.html